This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 68382f281cbb7c9b814e9ceb5ed75db4614a7c84
Author: Willem Jiang <[email protected]>
AuthorDate: Tue Aug 28 09:11:41 2018 +0800

    SCB-818 Implements the TccEventServie of grpc
---
 omega/omega-connector/omega-connector-grpc/pom.xml |   6 +-
 .../grpc/GrpcCompensateStreamObserver.java         |   3 +-
 ...rver.java => GrpcCoordinateStreamObserver.java} |  37 +--
 .../omega/connector/grpc/GrpcTccEventService.java  | 156 +++++++++++++
 .../connector/grpc/GrpcTccEventServiceTest.java    | 260 +++++++++++++++++++++
 pom.xml                                            |   5 +
 6 files changed, 440 insertions(+), 27 deletions(-)

diff --git a/omega/omega-connector/omega-connector-grpc/pom.xml 
b/omega/omega-connector/omega-connector-grpc/pom.xml
index 7a14097..f4d7bf1 100644
--- a/omega/omega-connector/omega-connector-grpc/pom.xml
+++ b/omega/omega-connector/omega-connector-grpc/pom.xml
@@ -18,7 +18,6 @@
   ~
   ~
   -->
-
 <project xmlns="http://maven.apache.org/POM/4.0.0";
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
@@ -59,6 +58,11 @@
     </dependency>
 
     <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-testing</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-slf4j-impl</artifactId>
       <scope>test</scope>
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
index 9d9c312..6aae96a 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
@@ -59,11 +59,12 @@ class GrpcCompensateStreamObserver implements 
StreamObserver<GrpcCompensateComma
 
   @Override
   public void onError(Throwable t) {
-    LOG.error("failed to process grpc compensate command.", t);
+    LOG.error("Failed to process grpc compensate command.", t);
     errorHandler.run();
   }
 
   @Override
   public void onCompleted() {
+    // Do nothing here
   }
 }
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCoordinateStreamObserver.java
similarity index 54%
copy from 
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
copy to 
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCoordinateStreamObserver.java
index 9d9c312..20f5974 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCoordinateStreamObserver.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,56 +13,44 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
- *
  */
 
 package org.apache.servicecomb.saga.omega.connector.grpc;
 
 import java.lang.invoke.MethodHandles;
 
-import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
-import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
+import org.apache.servicecomb.saga.omega.transaction.tcc.MessageHandler;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import io.grpc.stub.StreamObserver;
 
-class GrpcCompensateStreamObserver implements 
StreamObserver<GrpcCompensateCommand> {
+public class GrpcCoordinateStreamObserver implements 
StreamObserver<GrpcTccCoordinateCommand> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
   private final MessageHandler messageHandler;
-  private final Runnable errorHandler;
-  private final MessageDeserializer deserializer;
+  
 
-  GrpcCompensateStreamObserver(MessageHandler messageHandler, Runnable 
errorHandler, MessageDeserializer deserializer) {
+  public GrpcCoordinateStreamObserver(MessageHandler messageHandler) {
     this.messageHandler = messageHandler;
-    this.errorHandler = errorHandler;
-    this.deserializer = deserializer;
   }
 
   @Override
-  public void onNext(GrpcCompensateCommand command) {
-    LOG.info("Received compensate command, global tx id: {}, local tx id: {}, 
compensation method: {}",
-        command.getGlobalTxId(), command.getLocalTxId(), 
command.getCompensationMethod());
-
-    messageHandler.onReceive(
-        command.getGlobalTxId(),
-        command.getLocalTxId(),
-        command.getParentTxId().isEmpty() ? null : command.getParentTxId(),
-        command.getCompensationMethod(),
-        deserializer.deserialize(command.getPayloads().toByteArray()));
+  public void onNext(GrpcTccCoordinateCommand command) {
+    LOG.info("Received coordinate command, global tx id: {}, local tx id: {}, 
call method: {}",
+        command.getGlobalTxId(), command.getLocalTxId(), command.getMethod());
+    messageHandler.onReceive(command.getGlobalTxId(), command.getLocalTxId(), 
command.getParentTxId(), command.getMethod());
   }
 
   @Override
   public void onError(Throwable t) {
-    LOG.error("failed to process grpc compensate command.", t);
-    errorHandler.run();
+    //TODO need to find a way to handle the error
+    LOG.error("Failed to process grpc coordinate command.", t);
   }
 
   @Override
   public void onCompleted() {
+    // Do nothing here
   }
 }
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventService.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventService.java
new file mode 100644
index 0000000..cfcb945
--- /dev/null
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventService.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import org.apache.servicecomb.saga.omega.context.ServiceConfig;
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+import org.apache.servicecomb.saga.omega.transaction.tcc.MessageHandler;
+import org.apache.servicecomb.saga.omega.transaction.tcc.TccEventService;
+import 
org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
+import 
org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
+import 
org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
+
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinatedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent;
+import 
org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
+import 
org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc;
+import 
org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceBlockingStub;
+import 
org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceStub;
+
+import io.grpc.ManagedChannel;
+
+public class GrpcTccEventService implements TccEventService {
+  private final GrpcServiceConfig serviceConfig;
+  private final String target;
+  private final TccEventServiceBlockingStub tccBlockingEventService;
+  private final TccEventServiceStub tccAsyncEventService;
+  private final GrpcCoordinateStreamObserver observer;
+
+  public GrpcTccEventService(ServiceConfig serviceConfig,
+      ManagedChannel channel,
+      String address,
+      MessageHandler handler
+      ) {
+    this.target = address;
+    tccBlockingEventService = TccEventServiceGrpc.newBlockingStub(channel);
+    tccAsyncEventService = TccEventServiceGrpc.newStub(channel);
+    this.serviceConfig = serviceConfig(serviceConfig.serviceName(), 
serviceConfig.instanceId());
+    observer = new GrpcCoordinateStreamObserver(handler);
+  }
+
+  @Override
+  public void onConnected() {
+    tccAsyncEventService.onConnected(serviceConfig, observer);
+  }
+
+  @Override
+  public void onDisconnected() {
+    tccBlockingEventService.onDisconnected(serviceConfig);
+  }
+
+  @Override
+  public void close() {
+    // do nothing here
+  }
+
+  @Override
+  public String target() {
+    return target;
+  }
+
+  @Override
+  public AlphaResponse participate(ParticipatedEvent participateEvent) {
+    GrpcAck grpcAck = 
tccBlockingEventService.participate(convertTo(participateEvent));
+    return new AlphaResponse(grpcAck.getAborted());
+  }
+
+  @Override
+  public AlphaResponse tccTransactionStart(TccStartedEvent tccStartEvent) {
+    GrpcAck grpcAck = 
tccBlockingEventService.onTccTransactionStarted(convertTo(tccStartEvent));
+    return new AlphaResponse(grpcAck.getAborted());
+  }
+
+
+  @Override
+  public AlphaResponse tccTransactionStop(TccEndedEvent tccEndEvent) {
+    GrpcAck grpcAck = 
tccBlockingEventService.onTccTransactionEnded(convertTo(tccEndEvent));
+    return new AlphaResponse(grpcAck.getAborted());
+
+  }
+
+  @Override
+  public AlphaResponse coordinate(CoordinatedEvent coordinatedEvent) {
+    GrpcAck grpcAck = 
tccBlockingEventService.onTccCoordinated(convertTo(coordinatedEvent));
+    return new AlphaResponse(grpcAck.getAborted());
+  }
+
+  private GrpcTccCoordinatedEvent convertTo(CoordinatedEvent coordinatedEvent) 
{
+    return GrpcTccCoordinatedEvent.newBuilder()
+        .setServiceName(serviceConfig.getServiceName())
+        .setInstanceId(serviceConfig.getInstanceId())
+        .setGlobalTxId(coordinatedEvent.getGlobalTxId())
+        .setLocalTxId(coordinatedEvent.getLocalTxId())
+        .setParentTxId(coordinatedEvent.getParentTxId())
+        .setMethodName(coordinatedEvent.getMethodName())
+        .setStatus(coordinatedEvent.getStatus().toString())
+        .build();
+  }
+
+  private GrpcServiceConfig serviceConfig(String serviceName, String 
instanceId) {
+    return GrpcServiceConfig.newBuilder()
+        .setServiceName(serviceName)
+        .setInstanceId(instanceId)
+        .build();
+  }
+
+  private GrpcTccTransactionStartedEvent convertTo(TccStartedEvent 
tccStartEvent) {
+    return GrpcTccTransactionStartedEvent.newBuilder()
+        .setServiceName(serviceConfig.getServiceName())
+        .setInstanceId(serviceConfig.getInstanceId())
+        .setGlobalTxId(tccStartEvent.getGlobalTxId())
+        .setLocalTxId(tccStartEvent.getLocalTxId())
+        .build();
+  }
+
+  private GrpcTccTransactionEndedEvent convertTo(TccEndedEvent tccEndEvent) {
+    return GrpcTccTransactionEndedEvent.newBuilder()
+        .setServiceName(serviceConfig.getServiceName())
+        .setInstanceId(serviceConfig.getInstanceId())
+        .setGlobalTxId(tccEndEvent.getGlobalTxId())
+        .setLocalTxId(tccEndEvent.getLocalTxId())
+        .setStatus(tccEndEvent.getStatus().toString())
+        .build();
+  }
+
+  private GrpcTccParticipatedEvent convertTo(ParticipatedEvent 
participateEvent) {
+    return GrpcTccParticipatedEvent.newBuilder()
+        .setServiceName(serviceConfig.getServiceName())
+        .setInstanceId(serviceConfig.getInstanceId())
+        .setGlobalTxId(participateEvent.getGlobalTxId())
+        .setLocalTxId(participateEvent.getLocalTxId())
+        .setParentTxId(participateEvent.getParentTxId())
+        .setCancelMethod(participateEvent.getCancelMethod())
+        .setConfirmMethod(participateEvent.getConfirmMethod())
+        .setStatus(participateEvent.getStatus().toString())
+        .build();
+  }
+}
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventServiceTest.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventServiceTest.java
new file mode 100644
index 0000000..b4cfef2
--- /dev/null
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventServiceTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.omega.context.ServiceConfig;
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+import org.apache.servicecomb.saga.omega.transaction.tcc.MessageHandler;
+import 
org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
+import 
org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
+import 
org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinatedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent;
+import 
org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
+import 
org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent;
+import 
org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceImplBase;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import io.grpc.ManagedChannel;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcCleanupRule;
+import io.grpc.util.MutableHandlerRegistry;
+
+@RunWith(JUnit4.class)
+public class GrpcTccEventServiceTest {
+  @Rule
+  public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+  private final MutableHandlerRegistry serviceRegistry = new 
MutableHandlerRegistry();
+  private final GrpcAck ack = GrpcAck.newBuilder().setAborted(false).build();
+
+  private final String globalTxId = uniquify("globalTxId");
+  private final String localTxId = uniquify("localTxId");
+  private final String parentTxId = uniquify("parentTxId");
+  private final String methodName = uniquify("methodName");
+  private final String confirmMethod = uniquify("confirmMethod");
+  private final String cancelMethod = uniquify("cancleMethod");
+  private final String serviceName = uniquify("serviceName");
+
+  private final ServiceConfig serviceConfig = new 
ServiceConfig(uniquify("Service"));
+  private final String address = uniquify("Address");
+  private final MessageHandler handler = mock(MessageHandler.class);
+  private GrpcTccEventService service;
+
+  @Before
+  public void setUp() throws Exception {
+    // Generate a unique in-process server name.
+    String serverName = InProcessServerBuilder.generateName();
+
+    // Create a server, add service, start, and register for automatic 
graceful shutdown.
+    grpcCleanup.register(InProcessServerBuilder.forName(serverName).
+        
fallbackHandlerRegistry(serviceRegistry).directExecutor().build().start());
+
+    // Create a client channel and register for automatic graceful shutdown.
+    ManagedChannel channel = grpcCleanup.register(
+        InProcessChannelBuilder.forName(serverName).directExecutor().build());
+
+    // Create a TccEventServiceStub using the in-process channel;
+    service = new GrpcTccEventService(serviceConfig, channel, address, 
handler);
+  }
+
+  @Test
+  public void serviceOnDisconnectedTest() {
+
+    final GrpcServiceConfig[] requestCaptor = new GrpcServiceConfig[1];
+
+    TccEventServiceImplBase serviceImpl = new TccEventServiceImplBase() {
+
+      public void onDisconnected(GrpcServiceConfig request, 
StreamObserver<GrpcAck> responseObserver) {
+        requestCaptor[0] = request;
+        responseObserver.onNext(ack);
+        responseObserver.onCompleted();
+      }
+    };
+
+    serviceRegistry.addService(serviceImpl);
+    service.onDisconnected();
+
+    assertThat(requestCaptor[0].getServiceName(), 
is(serviceConfig.serviceName()));
+    assertThat(requestCaptor[0].getInstanceId(), 
is(serviceConfig.instanceId()));
+  }
+
+
+  @Test
+  public void serviceOnConnectedTest() {
+    final GrpcTccCoordinateCommand coordinateCommand =
+        GrpcTccCoordinateCommand.newBuilder()
+            .setGlobalTxId(globalTxId)
+            .setLocalTxId(localTxId)
+            .setParentTxId(parentTxId)
+            .setMethod(methodName)
+            .setServiceName(serviceName)
+        .build();
+
+    final GrpcServiceConfig[] requestCaptor = new GrpcServiceConfig[1];
+
+    TccEventServiceImplBase serviceImpl = new TccEventServiceImplBase() {
+      @Override
+      public void onConnected(GrpcServiceConfig request, 
StreamObserver<GrpcTccCoordinateCommand> responseObserver) {
+        requestCaptor[0] = request;
+        // Just send the coordinateCommand back
+        responseObserver.onNext(coordinateCommand);
+        responseObserver.onCompleted();
+      }
+    };
+
+    serviceRegistry.addService(serviceImpl);
+    service.onConnected();
+
+    assertThat(requestCaptor[0].getServiceName(), 
is(serviceConfig.serviceName()));
+    assertThat(requestCaptor[0].getInstanceId(), 
is(serviceConfig.instanceId()));
+
+    verify(handler).onReceive(globalTxId, localTxId,parentTxId, methodName);
+
+  }
+
+  @Test
+  public void serviceOnTransactionStartTest() {
+
+    final GrpcTccTransactionStartedEvent[] requestCaptor = new 
GrpcTccTransactionStartedEvent[1];
+    TccStartedEvent event = new TccStartedEvent(globalTxId,localTxId);
+
+    TccEventServiceImplBase serviceImpl = new TccEventServiceImplBase() {
+
+      public void onTccTransactionStarted(GrpcTccTransactionStartedEvent 
request,
+          io.grpc.stub.StreamObserver<GrpcAck> responseObserver) {
+        requestCaptor[0] = request;
+        responseObserver.onNext(ack);
+        responseObserver.onCompleted();
+      }
+    };
+
+    serviceRegistry.addService(serviceImpl);
+    AlphaResponse response =service.tccTransactionStart(event);
+
+    assertThat(requestCaptor[0].getServiceName(), 
is(serviceConfig.serviceName()));
+    assertThat(requestCaptor[0].getInstanceId(), 
is(serviceConfig.instanceId()));
+    assertThat(requestCaptor[0].getGlobalTxId(), is(globalTxId));
+    assertThat(requestCaptor[0].getLocalTxId(), is(localTxId));
+    assertThat(response.aborted(), is(false));
+  }
+
+  @Test
+  public void serviceOnTransactionEndTest() {
+
+    final GrpcTccTransactionEndedEvent[] requestCaptor = new 
GrpcTccTransactionEndedEvent[1];
+    TccEndedEvent event = new TccEndedEvent(globalTxId,localTxId, 
TransactionStatus.Failed);
+
+    TccEventServiceImplBase serviceImpl = new TccEventServiceImplBase() {
+
+      public void onTccTransactionEnded(GrpcTccTransactionEndedEvent request,
+          io.grpc.stub.StreamObserver<GrpcAck> responseObserver) {
+        requestCaptor[0] = request;
+        responseObserver.onNext(ack);
+        responseObserver.onCompleted();
+      }
+    };
+
+    serviceRegistry.addService(serviceImpl);
+    AlphaResponse response =service.tccTransactionStop(event);
+
+    assertThat(requestCaptor[0].getServiceName(), 
is(serviceConfig.serviceName()));
+    assertThat(requestCaptor[0].getInstanceId(), 
is(serviceConfig.instanceId()));
+    assertThat(requestCaptor[0].getGlobalTxId(), is(globalTxId));
+    assertThat(requestCaptor[0].getLocalTxId(), is(localTxId));
+    assertThat(requestCaptor[0].getStatus(), 
is(TransactionStatus.Failed.toString()));
+    assertThat(response.aborted(), is(false));
+  }
+
+  @Test
+  public void serviceOnParticipateTest() {
+
+    final GrpcTccParticipatedEvent[] requestCaptor = new 
GrpcTccParticipatedEvent[1];
+    ParticipatedEvent event = new ParticipatedEvent(globalTxId,localTxId, 
parentTxId, confirmMethod, cancelMethod, TransactionStatus.Succeed);
+
+    TccEventServiceImplBase serviceImpl = new TccEventServiceImplBase() {
+
+      public void participate(GrpcTccParticipatedEvent request,
+          StreamObserver<GrpcAck> responseObserver) {
+        requestCaptor[0] = request;
+        responseObserver.onNext(ack);
+        responseObserver.onCompleted();
+      }
+    };
+
+    serviceRegistry.addService(serviceImpl);
+    AlphaResponse response =service.participate(event);
+
+    assertThat(requestCaptor[0].getServiceName(), 
is(serviceConfig.serviceName()));
+    assertThat(requestCaptor[0].getInstanceId(), 
is(serviceConfig.instanceId()));
+    assertThat(requestCaptor[0].getGlobalTxId(), is(globalTxId));
+    assertThat(requestCaptor[0].getLocalTxId(), is(localTxId));
+    assertThat(requestCaptor[0].getParentTxId(), is(parentTxId));
+    assertThat(requestCaptor[0].getCancelMethod(), is(cancelMethod));
+    assertThat(requestCaptor[0].getConfirmMethod(), is(confirmMethod));
+    assertThat(requestCaptor[0].getStatus(), 
is(TransactionStatus.Succeed.toString()));
+    assertThat(response.aborted(), is(false));
+  }
+
+  @Test
+  public void serviceOnCoordinateTest() {
+
+    final GrpcTccCoordinatedEvent[] requestCaptor = new 
GrpcTccCoordinatedEvent[1];
+    CoordinatedEvent event = new CoordinatedEvent(globalTxId,localTxId, 
parentTxId, methodName, TransactionStatus.Succeed);
+
+    TccEventServiceImplBase serviceImpl = new TccEventServiceImplBase() {
+
+      public void onTccCoordinated(GrpcTccCoordinatedEvent request,
+          io.grpc.stub.StreamObserver<GrpcAck> responseObserver) {
+        requestCaptor[0] = request;
+        responseObserver.onNext(ack);
+        responseObserver.onCompleted();
+      }
+    };
+
+    serviceRegistry.addService(serviceImpl);
+    AlphaResponse response =service.coordinate(event);
+
+    assertThat(requestCaptor[0].getServiceName(), 
is(serviceConfig.serviceName()));
+    assertThat(requestCaptor[0].getInstanceId(), 
is(serviceConfig.instanceId()));
+    assertThat(requestCaptor[0].getGlobalTxId(), is(globalTxId));
+    assertThat(requestCaptor[0].getLocalTxId(), is(localTxId));
+    assertThat(requestCaptor[0].getMethodName(), is(methodName));
+    assertThat(requestCaptor[0].getStatus(), 
is(TransactionStatus.Succeed.toString()));
+    assertThat(response.aborted(), is(false));
+  }
+
+
+
+}
diff --git a/pom.xml b/pom.xml
index 83e5366..57ab449 100644
--- a/pom.xml
+++ b/pom.xml
@@ -503,6 +503,11 @@
         <version>${grpc.version}</version>
       </dependency>
       <dependency>
+        <groupId>io.grpc</groupId>
+        <artifactId>grpc-testing</artifactId>
+        <version>${grpc.version}</version>
+      </dependency>
+      <dependency>
         <groupId>com.esotericsoftware</groupId>
         <artifactId>kryo</artifactId>
         <version>${kryo.version}</version>

Reply via email to