[ 
https://issues.apache.org/jira/browse/SCB-909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16623113#comment-16623113
 ] 

ASF GitHub Bot commented on SCB-909:
------------------------------------

WillemJiang closed pull request #302: SCB-909 Add fault tolerance for service 
comb TCC  (WIP)
URL: https://github.com/apache/incubator-servicecomb-saga/pull/302
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/AlphaClusterConfig.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/AlphaClusterConfig.java
index a8641b43..99bff3f8 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/AlphaClusterConfig.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/AlphaClusterConfig.java
@@ -19,9 +19,11 @@
 
 import java.util.Collections;
 import java.util.List;
+import org.apache.servicecomb.saga.omega.connector.grpc.tcc.TccMessageSender;
 import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
 import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
+import org.apache.servicecomb.saga.omega.transaction.tcc.TccMessageHandler;
 
 public class AlphaClusterConfig {
 
@@ -43,6 +45,8 @@
 
   private MessageHandler messageHandler;
 
+  private TccMessageHandler tccMessageHandler;
+
   /**
    * @deprecated Use {@link Builder} instead.
    */
@@ -65,7 +69,8 @@ private AlphaClusterConfig(List<String> addresses, boolean 
enableSSL, boolean en
       String cert, String key, String certChain,
       MessageSerializer messageSerializer,
       MessageDeserializer messageDeserializer,
-      MessageHandler messageHandler) {
+      MessageHandler messageHandler,
+      TccMessageHandler tccMessageHandler) {
     this.addresses = addresses;
     this.enableSSL = enableSSL;
     this.enableMutualAuth = enableMutualAuth;
@@ -75,6 +80,7 @@ private AlphaClusterConfig(List<String> addresses, boolean 
enableSSL, boolean en
     this.messageSerializer = messageSerializer;
     this.messageDeserializer = messageDeserializer;
     this.messageHandler = messageHandler;
+    this.tccMessageHandler = tccMessageHandler;
   }
 
   public static Builder builder() {
@@ -92,6 +98,7 @@ public static Builder builder() {
     private MessageSerializer messageSerializer;
     private MessageDeserializer messageDeserializer;
     private MessageHandler messageHandler;
+    private TccMessageHandler tccMessageHandler;
 
     public Builder addresses(List<String> addresses) {
       this.addresses = addresses;
@@ -138,6 +145,11 @@ public Builder messageHandler(MessageHandler 
messageHandler) {
       return this;
     }
 
+    public Builder tccMessageHandler(TccMessageHandler tccMessageHandler) {
+      this.tccMessageHandler = tccMessageHandler;
+      return this;
+    }
+
 
     public AlphaClusterConfig build() {
       return new AlphaClusterConfig(this.addresses,
@@ -148,7 +160,8 @@ public AlphaClusterConfig build() {
           this.certChain,
           this.messageSerializer,
           this.messageDeserializer,
-          messageHandler);
+          messageHandler,
+          tccMessageHandler);
     }
   }
 
@@ -187,4 +200,8 @@ public MessageDeserializer getMessageDeserializer() {
   public MessageHandler getMessageHandler() {
     return messageHandler;
   }
+
+  public TccMessageHandler getTccMessageHandler() {
+    return tccMessageHandler;
+  }
 }
\ No newline at end of file
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/FastestSender.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/FastestSender.java
new file mode 100644
index 00000000..ed758402
--- /dev/null
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/FastestSender.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import com.google.common.base.Supplier;
+import java.util.Map;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+
+/**
+ * The strategy of picking the fastest {@link MessageSender}
+ */
+public class FastestSender implements MessageSenderPicker {
+
+  @Override
+  public MessageSender pick(Map<MessageSender, Long> messageSenders,
+      Supplier<MessageSender> defaultSender) {
+    Long min = Long.MAX_VALUE;
+    MessageSender sender = null;
+    for (Map.Entry<MessageSender, Long> entry : messageSenders.entrySet()) {
+      if (entry.getValue() != Long.MAX_VALUE && min > entry.getValue()) {
+        min = entry.getValue();
+        sender = entry.getKey();
+      }
+    }
+    if (sender == null) {
+      return defaultSender.get();
+    } else {
+      return sender;
+    }
+  }
+}
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventService.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventService.java
index cfcb9450..c619584a 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventService.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventService.java
@@ -17,9 +17,10 @@
 
 package org.apache.servicecomb.saga.omega.connector.grpc;
 
+import 
org.apache.servicecomb.saga.omega.connector.grpc.tcc.GrpcCoordinateStreamObserver;
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
 import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
-import org.apache.servicecomb.saga.omega.transaction.tcc.MessageHandler;
+import org.apache.servicecomb.saga.omega.transaction.tcc.TccMessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.tcc.TccEventService;
 import 
org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
 import 
org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
@@ -48,13 +49,13 @@
   public GrpcTccEventService(ServiceConfig serviceConfig,
       ManagedChannel channel,
       String address,
-      MessageHandler handler
+      TccMessageHandler handler
       ) {
     this.target = address;
     tccBlockingEventService = TccEventServiceGrpc.newBlockingStub(channel);
     tccAsyncEventService = TccEventServiceGrpc.newStub(channel);
     this.serviceConfig = serviceConfig(serviceConfig.serviceName(), 
serviceConfig.instanceId());
-    observer = new GrpcCoordinateStreamObserver(handler);
+    observer = new GrpcCoordinateStreamObserver(null, null, handler);
   }
 
   @Override
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index a2933447..c3507555 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -240,26 +240,3 @@ private static SslContext 
buildSslContext(AlphaClusterConfig clusterConfig) thro
   }
 }
 
-/**
- * The strategy of picking the fastest {@link MessageSender}
- */
-class FastestSender implements MessageSenderPicker {
-
-  @Override
-  public MessageSender pick(Map<MessageSender, Long> messageSenders,
-      Supplier<MessageSender> defaultSender) {
-    Long min = Long.MAX_VALUE;
-    MessageSender sender = null;
-    for (Map.Entry<MessageSender, Long> entry : messageSenders.entrySet()) {
-      if (entry.getValue() != Long.MAX_VALUE && min > entry.getValue()) {
-        min = entry.getValue();
-        sender = entry.getKey();
-      }
-    }
-    if (sender == null) {
-      return defaultSender.get();
-    } else {
-      return sender;
-    }
-  }
-}
\ No newline at end of file
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
index 02571fde..6faffcb2 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
@@ -19,21 +19,22 @@
 
 import java.lang.invoke.MethodHandles;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.BlockingQueue;
-
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class PushBackReconnectRunnable implements Runnable {
+public class PushBackReconnectRunnable implements Runnable {
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final MessageSender messageSender;
   private final Map<MessageSender, Long> senders;
+
   private final BlockingQueue<Runnable> pendingTasks;
 
   private final BlockingQueue<MessageSender> connectedSenders;
 
-  PushBackReconnectRunnable(
+  public PushBackReconnectRunnable(
       MessageSender messageSender,
       Map<MessageSender, Long> senders,
       BlockingQueue<Runnable> pendingTasks,
@@ -58,4 +59,21 @@ public void run() {
       pendingTasks.offer(this);
     }
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    PushBackReconnectRunnable that = (PushBackReconnectRunnable) o;
+    return Objects.equals(messageSender, that.messageSender);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(messageSender);
+  }
 }
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCoordinateStreamObserver.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/GrpcCoordinateStreamObserver.java
similarity index 52%
rename from 
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCoordinateStreamObserver.java
rename to 
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/GrpcCoordinateStreamObserver.java
index 31943b83..d69930c3 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCoordinateStreamObserver.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/GrpcCoordinateStreamObserver.java
@@ -6,33 +6,34 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *       http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
  */
 
-package org.apache.servicecomb.saga.omega.connector.grpc;
+package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
 
 import java.lang.invoke.MethodHandles;
-
-import org.apache.servicecomb.saga.omega.transaction.tcc.MessageHandler;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.tcc.TccMessageHandler;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.grpc.stub.StreamObserver;
-
-public class GrpcCoordinateStreamObserver implements 
StreamObserver<GrpcTccCoordinateCommand> {
+public class GrpcCoordinateStreamObserver extends 
ReconnectStreamObserver<GrpcTccCoordinateCommand> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final MessageHandler messageHandler;
-  
 
-  public GrpcCoordinateStreamObserver(MessageHandler messageHandler) {
+  private final TccMessageHandler messageHandler;
+
+  public GrpcCoordinateStreamObserver(
+      LoadBalanceContext loadBalanceContext, MessageSender messageSender,
+      TccMessageHandler messageHandler) {
+    super(loadBalanceContext, messageSender);
     this.messageHandler = messageHandler;
   }
 
@@ -42,15 +43,4 @@ public void onNext(GrpcTccCoordinateCommand command) {
         command.getGlobalTxId(), command.getLocalTxId(), command.getMethod());
     messageHandler.onReceive(command.getGlobalTxId(), command.getLocalTxId(), 
command.getParentTxId(), command.getMethod());
   }
-
-  @Override
-  public void onError(Throwable t) {
-    //TODO need to find a way to handle the error and create connection again
-    LOG.error("Failed to process grpc coordinate command.", t);
-  }
-
-  @Override
-  public void onCompleted() {
-    // Do nothing here
-  }
 }
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/GrpcOnErrorHandler.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/GrpcOnErrorHandler.java
new file mode 100644
index 00000000..8e9b26f5
--- /dev/null
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/GrpcOnErrorHandler.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+
+import com.google.common.base.Supplier;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import 
org.apache.servicecomb.saga.omega.connector.grpc.PushBackReconnectRunnable;
+import org.apache.servicecomb.saga.omega.connector.grpc.RetryableMessageSender;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+
+public class GrpcOnErrorHandler {
+
+  private final BlockingQueue<Runnable> pendingTasks;
+
+  private final Map<MessageSender, Long> senders;
+
+  private final GrpcRetryContext grpcRetryContext;
+
+  public GrpcOnErrorHandler(BlockingQueue<Runnable> pendingTasks, 
Map<MessageSender, Long> senders) {
+    this.pendingTasks = pendingTasks;
+    this.senders = senders;
+    this.grpcRetryContext = new GrpcRetryContext();
+  }
+
+  public void handle(MessageSender messageSender) {
+    final Runnable runnable = new PushBackReconnectRunnable(
+        messageSender,
+        senders,
+        pendingTasks,
+        grpcRetryContext.getReconnectedSenders()
+    );
+    synchronized (pendingTasks) {
+      if (!pendingTasks.contains(runnable)) {
+        pendingTasks.offer(runnable);
+      }
+    }
+  }
+
+  public GrpcRetryContext getGrpcRetryContext() {
+    return grpcRetryContext;
+  }
+
+  public static class GrpcRetryContext {
+
+    private final BlockingQueue<MessageSender> reconnectedSenders = new 
LinkedBlockingQueue<>();
+
+    private final MessageSender retryMessageSender = new 
RetryableMessageSender(reconnectedSenders);
+
+    private final Supplier<MessageSender> defaultMessageSender = new 
Supplier<MessageSender>() {
+      @Override
+      public MessageSender get() {
+        return retryMessageSender;
+      }
+    };
+
+    public BlockingQueue<MessageSender> getReconnectedSenders() {
+      return reconnectedSenders;
+    }
+
+    public Supplier<MessageSender> getDefaultMessageSender() {
+      return defaultMessageSender;
+    }
+  }
+}
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/GrpcTccClientMessageSender.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/GrpcTccClientMessageSender.java
new file mode 100644
index 00000000..e23f366b
--- /dev/null
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/GrpcTccClientMessageSender.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+
+import io.grpc.ManagedChannel;
+import org.apache.servicecomb.saga.omega.context.ServiceConfig;
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.TccMessageHandler;
+import 
org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
+import 
org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
+import 
org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinatedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent;
+import 
org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
+import 
org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc;
+import 
org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceBlockingStub;
+import 
org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceStub;
+
+public class GrpcTccClientMessageSender implements TccMessageSender {
+
+  private final GrpcServiceConfig serviceConfig;
+  private final String target;
+  private final TccEventServiceBlockingStub tccBlockingEventService;
+  private final TccEventServiceStub tccAsyncEventService;
+  private final GrpcCoordinateStreamObserver observer;
+
+  public GrpcTccClientMessageSender(ServiceConfig serviceConfig,
+      ManagedChannel channel,
+      String address,
+      TccMessageHandler handler,
+      LoadBalanceContext loadContext) {
+    this.target = address;
+    tccBlockingEventService = TccEventServiceGrpc.newBlockingStub(channel);
+    tccAsyncEventService = TccEventServiceGrpc.newStub(channel);
+    this.serviceConfig = serviceConfig(serviceConfig.serviceName(), 
serviceConfig.instanceId());
+    observer = new GrpcCoordinateStreamObserver(loadContext, this, handler);
+  }
+
+  @Override
+  public void onConnected() {
+    tccAsyncEventService.onConnected(serviceConfig, observer);
+  }
+
+  @Override
+  public void onDisconnected() {
+    tccBlockingEventService.onDisconnected(serviceConfig);
+  }
+
+  @Override
+  public void close() {
+    // do nothing here
+  }
+
+  @Override
+  public String target() {
+    return target;
+  }
+
+  @Override
+  public AlphaResponse participate(ParticipatedEvent participateEvent) {
+    GrpcAck grpcAck = 
tccBlockingEventService.participate(convertTo(participateEvent));
+    return new AlphaResponse(grpcAck.getAborted());
+  }
+
+  @Override
+  public AlphaResponse tccTransactionStart(TccStartedEvent tccStartEvent) {
+    GrpcAck grpcAck = 
tccBlockingEventService.onTccTransactionStarted(convertTo(tccStartEvent));
+    return new AlphaResponse(grpcAck.getAborted());
+  }
+
+
+  @Override
+  public AlphaResponse tccTransactionStop(TccEndedEvent tccEndEvent) {
+    GrpcAck grpcAck = 
tccBlockingEventService.onTccTransactionEnded(convertTo(tccEndEvent));
+    return new AlphaResponse(grpcAck.getAborted());
+
+  }
+
+  @Override
+  public AlphaResponse coordinate(CoordinatedEvent coordinatedEvent) {
+    GrpcAck grpcAck = 
tccBlockingEventService.onTccCoordinated(convertTo(coordinatedEvent));
+    return new AlphaResponse(grpcAck.getAborted());
+  }
+
+  private GrpcTccCoordinatedEvent convertTo(CoordinatedEvent coordinatedEvent) 
{
+    return GrpcTccCoordinatedEvent.newBuilder()
+        .setServiceName(serviceConfig.getServiceName())
+        .setInstanceId(serviceConfig.getInstanceId())
+        .setGlobalTxId(coordinatedEvent.getGlobalTxId())
+        .setLocalTxId(coordinatedEvent.getLocalTxId())
+        .setParentTxId(coordinatedEvent.getParentTxId())
+        .setMethodName(coordinatedEvent.getMethodName())
+        .setStatus(coordinatedEvent.getStatus().toString())
+        .build();
+  }
+
+  private GrpcServiceConfig serviceConfig(String serviceName, String 
instanceId) {
+    return GrpcServiceConfig.newBuilder()
+        .setServiceName(serviceName)
+        .setInstanceId(instanceId)
+        .build();
+  }
+
+  private GrpcTccTransactionStartedEvent convertTo(TccStartedEvent 
tccStartEvent) {
+    return GrpcTccTransactionStartedEvent.newBuilder()
+        .setServiceName(serviceConfig.getServiceName())
+        .setInstanceId(serviceConfig.getInstanceId())
+        .setGlobalTxId(tccStartEvent.getGlobalTxId())
+        .setLocalTxId(tccStartEvent.getLocalTxId())
+        .build();
+  }
+
+  private GrpcTccTransactionEndedEvent convertTo(TccEndedEvent tccEndEvent) {
+    return GrpcTccTransactionEndedEvent.newBuilder()
+        .setServiceName(serviceConfig.getServiceName())
+        .setInstanceId(serviceConfig.getInstanceId())
+        .setGlobalTxId(tccEndEvent.getGlobalTxId())
+        .setLocalTxId(tccEndEvent.getLocalTxId())
+        .setStatus(tccEndEvent.getStatus().toString())
+        .build();
+  }
+
+  private GrpcTccParticipatedEvent convertTo(ParticipatedEvent 
participateEvent) {
+    return GrpcTccParticipatedEvent.newBuilder()
+        .setServiceName(serviceConfig.getServiceName())
+        .setInstanceId(serviceConfig.getInstanceId())
+        .setGlobalTxId(participateEvent.getGlobalTxId())
+        .setLocalTxId(participateEvent.getLocalTxId())
+        .setParentTxId(participateEvent.getParentTxId())
+        .setCancelMethod(participateEvent.getCancelMethod())
+        .setConfirmMethod(participateEvent.getConfirmMethod())
+        .setStatus(participateEvent.getStatus().toString())
+        .build();
+  }
+
+  @Override
+  public AlphaResponse send(TxEvent event) {
+    return null;
+  }
+}
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceContext.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceContext.java
new file mode 100644
index 00000000..3831a921
--- /dev/null
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceContext.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+
+import io.grpc.ManagedChannel;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+
+public class LoadBalanceContext {
+
+  private Map<MessageSender, Long> senders;
+
+  private final Collection<ManagedChannel> channels;
+
+  private final PendingTaskRunner pendingTaskRunner;
+
+  private final GrpcOnErrorHandler grpcOnErrorHandler;
+
+  public LoadBalanceContext(Map<MessageSender, Long> senders,
+      Collection<ManagedChannel> channels, int reconnectDelay) {
+    this.senders = senders;
+    this.channels = channels;
+    this.pendingTaskRunner = new PendingTaskRunner(reconnectDelay);
+    this.grpcOnErrorHandler = new 
GrpcOnErrorHandler(pendingTaskRunner.getPendingTasks(), senders);
+    pendingTaskRunner.start();
+  }
+
+  public Map<MessageSender, Long> getSenders() {
+    return senders;
+  }
+
+  public Collection<ManagedChannel> getChannels() {
+    return channels;
+  }
+
+  public PendingTaskRunner getPendingTaskRunner() {
+    return pendingTaskRunner;
+  }
+
+  public GrpcOnErrorHandler getGrpcOnErrorHandler() {
+    return grpcOnErrorHandler;
+  }
+
+  public void setSenders(Map<MessageSender, Long> senders) {
+    this.senders = senders;
+  }
+}
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceContextBuilder.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceContextBuilder.java
new file mode 100644
index 00000000..0b22ab60
--- /dev/null
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceContextBuilder.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+
+import com.google.common.base.Optional;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NegotiationType;
+import io.grpc.netty.NettyChannelBuilder;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.net.ssl.SSLException;
+import org.apache.servicecomb.saga.omega.connector.grpc.AlphaClusterConfig;
+import org.apache.servicecomb.saga.omega.context.ServiceConfig;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+
+public class LoadBalanceContextBuilder {
+
+  private final AlphaClusterConfig clusterConfig;
+
+  private final ServiceConfig serviceConfig;
+
+  private final int reconnectDelay;
+
+  private final TransactionType transactionType;
+
+  public LoadBalanceContextBuilder(TransactionType transactionType,
+      AlphaClusterConfig clusterConfig, ServiceConfig serviceConfig, int 
reconnectDelay) {
+    this.transactionType = transactionType;
+    this.clusterConfig = clusterConfig;
+    this.serviceConfig = serviceConfig;
+    this.reconnectDelay = reconnectDelay;
+  }
+
+  public LoadBalanceContext build() {
+
+    if (clusterConfig.getAddresses().isEmpty()) {
+      throw new IllegalArgumentException("No reachable cluster address 
provided");
+    }
+
+    Optional<SslContext> sslContext = buildSslContext(clusterConfig);
+    Map<MessageSender, Long> senders = new ConcurrentHashMap<>();
+    Collection<ManagedChannel> channels = new 
ArrayList<>(clusterConfig.getAddresses().size());
+    LoadBalanceContext loadContext = new LoadBalanceContext(senders, channels, 
reconnectDelay);
+
+    for (String address : clusterConfig.getAddresses()) {
+      ManagedChannel channel = buildChannel(address, sslContext);
+      channels.add(channel);
+      MessageSender messageSender = buildSender(address, channel, 
clusterConfig, serviceConfig, loadContext);
+      senders.put(messageSender, 0L);
+    }
+    return loadContext;
+  }
+
+  private ManagedChannel buildChannel(String address, Optional<SslContext> 
sslContext) {
+    if (sslContext.isPresent()) {
+      return NettyChannelBuilder.forTarget(address)
+          .negotiationType(NegotiationType.TLS)
+          .sslContext(sslContext.get())
+          .build();
+    } else {
+      return ManagedChannelBuilder
+          .forTarget(address).usePlaintext()
+          .build();
+    }
+  }
+
+  private MessageSender buildSender(
+      String address, ManagedChannel channel, AlphaClusterConfig clusterConfig,
+      ServiceConfig serviceConfig, LoadBalanceContext loadContext) {
+    switch (transactionType) {
+      case TCC:
+        return new GrpcTccClientMessageSender(
+            serviceConfig,
+            channel,
+            address,
+            clusterConfig.getTccMessageHandler(),
+            loadContext);
+
+      case SAGA:
+//        return new GrpcClientMessageSender(
+//            address,
+//            channel,
+//            clusterConfig.getMessageSerializer(),
+//            clusterConfig.getMessageDeserializer(),
+//            serviceConfig,
+//            new LoadBalancedClusterMessageSender()ErrorHandlerFactory(),
+//            clusterConfig.getMessageHandler());
+        default:
+    }
+      return null;
+  }
+
+  private Optional<SslContext> buildSslContext(AlphaClusterConfig 
clusterConfig) {
+    if (!clusterConfig.isEnableSSL()) {
+      return Optional.absent();
+    }
+
+    SslContextBuilder builder = GrpcSslContexts.forClient();
+    // openssl must be used because some older JDk does not support cipher 
suites required by http2,
+    // and the performance of JDK ssl is pretty low compared to openssl.
+    builder.sslProvider(SslProvider.OPENSSL);
+
+    Properties prop = new Properties();
+    try {
+      
prop.load(LoadBalanceContextBuilder.class.getClassLoader().getResourceAsStream("ssl.properties"));
+    } catch (IOException e) {
+      throw new IllegalArgumentException("Unable to read ssl.properties.", e);
+    }
+
+    builder.protocols(prop.getProperty("protocols").split(","));
+    builder.ciphers(Arrays.asList(prop.getProperty("ciphers").split(",")));
+    builder.trustManager(new File(clusterConfig.getCertChain()));
+
+    if (clusterConfig.isEnableMutualAuth()) {
+      builder.keyManager(new File(clusterConfig.getCert()), new 
File(clusterConfig.getKey()));
+    }
+
+    try {
+      return Optional.of(builder.build());
+    } catch (SSLException e) {
+      throw new IllegalArgumentException("Unable to build SslContext", e);
+    }
+  }
+}
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceSenderAdapter.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceSenderAdapter.java
new file mode 100644
index 00000000..b7a8ef2c
--- /dev/null
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceSenderAdapter.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+
+import com.google.common.base.Optional;
+import io.grpc.ManagedChannel;
+import java.lang.invoke.MethodHandles;
+import org.apache.servicecomb.saga.omega.connector.grpc.MessageSenderPicker;
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class LoadBalanceSenderAdapter implements MessageSender {
+
+  private final LoadBalanceContext loadContext;
+
+  private final MessageSenderPicker senderPicker;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public LoadBalanceSenderAdapter(
+      LoadBalanceContext loadContext,
+      MessageSenderPicker senderPicker) {
+    this.loadContext = loadContext;
+    this.senderPicker = senderPicker;
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> T pickMessageSender() {
+    return (T) senderPicker.pick(loadContext.getSenders(),
+        
loadContext.getGrpcOnErrorHandler().getGrpcRetryContext().getDefaultMessageSender());
+  }
+
+  public <T> Optional<AlphaResponse> doGrpcSend(MessageSender messageSender, T 
event, SenderExecutor<T> executor) {
+    AlphaResponse response = null;
+    try {
+      long startTime = System.nanoTime();
+      response = executor.apply(event);
+      loadContext.getSenders().put(messageSender, System.nanoTime() - 
startTime);
+    } catch (OmegaException e) {
+      throw e;
+    } catch (Exception e) {
+      LOG.error("Retry sending event {} due to failure", event, e);
+      loadContext.getSenders().put(messageSender, Long.MAX_VALUE);
+//      loadContext.getGrpcOnErrorHandler().handle(messageSender);
+    }
+    return Optional.fromNullable(response);
+  }
+
+  @Override
+  public void onConnected() {
+    for(MessageSender sender : loadContext.getSenders().keySet()){
+      try {
+        sender.onConnected();
+      } catch (Exception e) {
+        LOG.error("Failed connecting to alpha at {}", sender.target(), e);
+      }
+    }
+  }
+
+  @Override
+  public void onDisconnected() {
+    for (MessageSender sender : loadContext.getSenders().keySet()) {
+      try {
+        sender.onDisconnected();
+      } catch (Exception e) {
+        LOG.error("Failed disconnecting from alpha at {}", sender.target(), e);
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    loadContext.getPendingTaskRunner().shutdown();
+    for(ManagedChannel channel : loadContext.getChannels()) {
+      channel.shutdownNow();
+    }
+  }
+
+  @Override
+  public String target() {
+    return "UNKNOWN";
+  }
+
+  @Override
+  public AlphaResponse send(TxEvent event) {
+    return null;
+  }
+
+  public MessageSenderPicker getSenderPicker() {
+    return senderPicker;
+  }
+}
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/PendingTaskRunner.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/PendingTaskRunner.java
new file mode 100644
index 00000000..dfbfbd87
--- /dev/null
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/PendingTaskRunner.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+
+public class PendingTaskRunner {
+
+  private final ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();
+
+  private final BlockingQueue<Runnable> pendingTasks = new 
LinkedBlockingQueue<>();
+
+  private final int reconnectDelay;
+
+  public PendingTaskRunner(int reconnectDelay) {
+    this.reconnectDelay = reconnectDelay;
+  }
+
+  public void start() {
+    scheduler.scheduleWithFixedDelay(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          pendingTasks.take().run();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }, 0, reconnectDelay, MILLISECONDS);
+  }
+
+  public void shutdown() {
+    scheduler.shutdown();
+  }
+
+  public BlockingQueue<Runnable> getPendingTasks() {
+    return pendingTasks;
+  }
+
+  public int getReconnectDelay() {
+    return reconnectDelay;
+  }
+}
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/ReconnectStreamObserver.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/ReconnectStreamObserver.java
new file mode 100644
index 00000000..938656fa
--- /dev/null
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/ReconnectStreamObserver.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+
+import io.grpc.stub.StreamObserver;
+import java.lang.invoke.MethodHandles;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ReconnectStreamObserver<T> implements StreamObserver<T> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final LoadBalanceContext loadContext;
+
+  private final MessageSender messageSender;
+
+  public ReconnectStreamObserver(
+      LoadBalanceContext loadContext, MessageSender messageSender) {
+    this.loadContext = loadContext;
+    this.messageSender = messageSender;
+  }
+
+  @Override
+  public void onError(Throwable t) {
+    LOG.error("Failed to process grpc coordinate command.", t);
+    loadContext.getGrpcOnErrorHandler().handle(messageSender);
+  }
+
+  @Override
+  public void onCompleted() {
+    // Do nothing here
+  }
+}
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/SagaLoadBalanceSender.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/SagaLoadBalanceSender.java
new file mode 100644
index 00000000..c3947913
--- /dev/null
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/SagaLoadBalanceSender.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+
+import com.google.common.base.Optional;
+import org.apache.servicecomb.saga.omega.connector.grpc.MessageSenderPicker;
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+
+public class SagaLoadBalanceSender extends LoadBalanceSenderAdapter {
+
+  public SagaLoadBalanceSender(LoadBalanceContext loadContext,
+      MessageSenderPicker senderPicker) {
+    super(loadContext, senderPicker);
+  }
+
+  @Override
+  public AlphaResponse send(TxEvent event) {
+    do {
+      final MessageSender messageSender = pickMessageSender();
+      Optional<AlphaResponse> response = doGrpcSend(messageSender, event, new 
SenderExecutor<TxEvent>() {
+        @Override
+        public AlphaResponse apply(TxEvent event) {
+          return messageSender.send(event);
+        }
+      });
+      if (response.isPresent()) return response.get();
+    } while (!Thread.currentThread().isInterrupted());
+
+    throw new OmegaException("Failed to send event " + event + " due to 
interruption");
+  }
+}
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/SenderExecutor.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/SenderExecutor.java
new file mode 100644
index 00000000..fb31eb5c
--- /dev/null
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/SenderExecutor.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+
+public interface SenderExecutor<T> {
+
+  AlphaResponse apply(T event);
+}
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TccLoadBalanceSender.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TccLoadBalanceSender.java
new file mode 100644
index 00000000..fe20337d
--- /dev/null
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TccLoadBalanceSender.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+
+import com.google.common.base.Optional;
+import org.apache.servicecomb.saga.omega.connector.grpc.MessageSenderPicker;
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import 
org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
+import 
org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
+import 
org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
+
+public class TccLoadBalanceSender extends LoadBalanceSenderAdapter implements 
TccMessageSender {
+
+  public TccLoadBalanceSender(LoadBalanceContext loadContext,
+      MessageSenderPicker senderPicker) {
+    super(loadContext, senderPicker);
+  }
+
+  @Override
+  public AlphaResponse participate(ParticipatedEvent participateEvent) {
+    do {
+      final TccMessageSender messageSender = pickMessageSender();
+      Optional<AlphaResponse> response = doGrpcSend(messageSender, 
participateEvent, new SenderExecutor<ParticipatedEvent>() {
+        @Override
+        public AlphaResponse apply(ParticipatedEvent event) {
+          return messageSender.participate(event);
+        }
+      });
+      if (response.isPresent()) return response.get();
+    } while (!Thread.currentThread().isInterrupted());
+
+    throw new OmegaException("Failed to send event " + participateEvent + " 
due to interruption");
+  }
+
+  @Override
+  public AlphaResponse tccTransactionStart(TccStartedEvent tccStartEvent) {
+    do {
+      final TccMessageSender messageSender = pickMessageSender();
+      Optional<AlphaResponse> response = doGrpcSend(messageSender, 
tccStartEvent, new SenderExecutor<TccStartedEvent>() {
+        @Override
+        public AlphaResponse apply(TccStartedEvent event) {
+          return messageSender.tccTransactionStart(event);
+        }
+      });
+      if (response.isPresent()) return response.get();
+    } while (!Thread.currentThread().isInterrupted());
+
+    throw new OmegaException("Failed to send event " + tccStartEvent + " due 
to interruption");
+  }
+
+  @Override
+  public AlphaResponse tccTransactionStop(TccEndedEvent tccEndEvent) {
+    do {
+      final TccMessageSender messageSender = pickMessageSender();
+      Optional<AlphaResponse> response = doGrpcSend(messageSender, 
tccEndEvent, new SenderExecutor<TccEndedEvent>() {
+        @Override
+        public AlphaResponse apply(TccEndedEvent event) {
+          return messageSender.tccTransactionStop(event);
+        }
+      });
+      if (response.isPresent()) return response.get();
+    } while (!Thread.currentThread().isInterrupted());
+
+    throw new OmegaException("Failed to send event " + tccEndEvent + " due to 
interruption");
+  }
+
+  @Override
+  public AlphaResponse coordinate(CoordinatedEvent coordinatedEvent) {
+    do {
+      final TccMessageSender messageSender = pickMessageSender();
+      Optional<AlphaResponse> response = doGrpcSend(messageSender, 
coordinatedEvent, new SenderExecutor<CoordinatedEvent>() {
+        @Override
+        public AlphaResponse apply(CoordinatedEvent event) {
+          return messageSender.coordinate(event);
+        }
+      });
+      if (response.isPresent()) return response.get();
+    } while (!Thread.currentThread().isInterrupted());
+
+    throw new OmegaException("Failed to send event " + coordinatedEvent + " 
due to interruption");
+  }
+}
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TccMessageSender.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TccMessageSender.java
new file mode 100644
index 00000000..00cad149
--- /dev/null
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TccMessageSender.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import 
org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
+import 
org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
+import 
org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
+
+public interface TccMessageSender extends MessageSender {
+
+  AlphaResponse participate(ParticipatedEvent participateEvent);
+
+  AlphaResponse tccTransactionStart(TccStartedEvent tccStartEvent);
+
+  AlphaResponse tccTransactionStop(TccEndedEvent tccEndEvent);
+
+  AlphaResponse coordinate(CoordinatedEvent coordinatedEvent);
+
+}
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TransactionType.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TransactionType.java
new file mode 100644
index 00000000..06108dd0
--- /dev/null
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TransactionType.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+
+public enum TransactionType {
+  TCC, SAGA
+}
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventServiceTest.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventServiceTest.java
index b4cfef23..51e17aff 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventServiceTest.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventServiceTest.java
@@ -26,7 +26,7 @@
 import org.apache.servicecomb.saga.common.TransactionStatus;
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
 import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
-import org.apache.servicecomb.saga.omega.transaction.tcc.MessageHandler;
+import org.apache.servicecomb.saga.omega.transaction.tcc.TccMessageHandler;
 import 
org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
 import 
org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
@@ -69,7 +69,7 @@
 
   private final ServiceConfig serviceConfig = new 
ServiceConfig(uniquify("Service"));
   private final String address = uniquify("Address");
-  private final MessageHandler handler = mock(MessageHandler.class);
+  private final TccMessageHandler handler = mock(TccMessageHandler.class);
   private GrpcTccEventService service;
 
   @Before
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceContextBuilderTest.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceContextBuilderTest.java
new file mode 100644
index 00000000..37e8aed3
--- /dev/null
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceContextBuilderTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+import io.grpc.ManagedChannel;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.testing.GrpcCleanupRule;
+import java.io.IOException;
+import java.util.ArrayList;
+import org.apache.servicecomb.saga.omega.connector.grpc.AlphaClusterConfig;
+import org.apache.servicecomb.saga.omega.context.ServiceConfig;
+import 
org.apache.servicecomb.saga.omega.transaction.tcc.CoordinateMessageHandler;
+import org.apache.servicecomb.saga.omega.transaction.tcc.TccMessageHandler;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class LoadBalanceContextBuilderTest {
+
+  @Rule
+  public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+
+  private final AlphaClusterConfig clusterConfig = 
mock(AlphaClusterConfig.class);
+  private final TccMessageHandler tccMessageHandler = 
mock(CoordinateMessageHandler.class);
+  private final String serverName = uniquify("serviceName");
+  private final ServiceConfig serviceConfig = new ServiceConfig(serverName);
+  protected final String[] addresses = {"localhost:8080", "localhost:8090"};
+
+  private  LoadBalanceContextBuilder tccLoadBalanceContextBuilder;
+  private  LoadBalanceContextBuilder sagaLoadBalanceContextBuilder;
+
+  @Before
+  public void setup() throws IOException {
+    // Create a server, add service, start, and register for automatic 
graceful shutdown.
+    
grpcCleanup.register(InProcessServerBuilder.forName("localhost:8080").directExecutor().build().start());
+    
grpcCleanup.register(InProcessServerBuilder.forName("localhost:8090").directExecutor().build().start());
+    
when(clusterConfig.getAddresses()).thenReturn(Lists.newArrayList(addresses));
+    when(clusterConfig.getTccMessageHandler()).thenReturn(tccMessageHandler);
+    tccLoadBalanceContextBuilder =
+        new LoadBalanceContextBuilder(TransactionType.TCC, clusterConfig, 
serviceConfig, 30);
+    sagaLoadBalanceContextBuilder =
+        new LoadBalanceContextBuilder(TransactionType.SAGA, clusterConfig, 
serviceConfig, 30);
+  }
+
+  @After
+  public void teardown() {
+  }
+
+  @Test
+  public void buildTccLoadBalanceContextWithoutSsl() {
+    when(clusterConfig.isEnableSSL()).thenReturn(false);
+
+    LoadBalanceContext loadContext = tccLoadBalanceContextBuilder.build();
+    assertThat(loadContext.getPendingTaskRunner().getReconnectDelay(), is(30));
+    assertThat(loadContext.getSenders().size(), is(2));
+    assertThat(loadContext.getSenders().keySet().iterator().next(), 
instanceOf(TccMessageSender.class));
+    assertThat(loadContext.getSenders().values().iterator().next(), is(0l));
+    assertThat(loadContext.getChannels().size(), is(2));
+    loadContext.getSenders().keySet().iterator().next().close();
+    shutdownChannels(loadContext);
+  }
+
+  @Test
+  public void buildTccLoadBalanceContextWithSsl() {
+    when(clusterConfig.isEnableSSL()).thenReturn(true);
+    
when(clusterConfig.getCert()).thenReturn(getClass().getClassLoader().getResource("client.crt").getFile());
+    
when(clusterConfig.getCertChain()).thenReturn(getClass().getClassLoader().getResource("ca.crt").getFile());
+    
when(clusterConfig.getKey()).thenReturn(getClass().getClassLoader().getResource("client.pem").getFile());
+    LoadBalanceContext loadContext = tccLoadBalanceContextBuilder.build();
+    assertThat(loadContext.getPendingTaskRunner().getReconnectDelay(), is(30));
+    assertThat(loadContext.getSenders().size(), is(2));
+    assertThat(loadContext.getSenders().keySet().iterator().next(), 
instanceOf(TccMessageSender.class));
+    assertThat(loadContext.getSenders().values().iterator().next(), is(0l));
+    assertThat(loadContext.getChannels().size(), is(2));
+    shutdownChannels(loadContext);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void throwExceptionWhenAddressIsNotExist() {
+    when(clusterConfig.getAddresses()).thenReturn(new ArrayList<String>());
+    tccLoadBalanceContextBuilder.build();
+  }
+
+  @Test
+  public void buildSagaLoadBalanceContextWithoutSsl() {
+
+  }
+
+  @Test
+  public void buildSagaLoadBalanceContextWithSsl() {
+
+  }
+
+  private void shutdownChannels(LoadBalanceContext loadContext) {
+    for (ManagedChannel each : loadContext.getChannels()) {
+      each.shutdownNow();
+    }
+  }
+}
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceSenderTestBase.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceSenderTestBase.java
new file mode 100644
index 00000000..b95e9bcd
--- /dev/null
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceSenderTestBase.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+
+import io.grpc.Server;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public abstract class LoadBalanceSenderTestBase {
+
+  protected static final int[] ports = {8080, 8090};
+
+  protected static final Map<Integer, Server> servers = new HashMap<>();
+
+  protected static final Map<Integer, Integer> delays = new HashMap<Integer, 
Integer>() {{
+    put(8080, 0);
+    put(8090, 800);
+  }};
+
+  protected static final Map<Integer, Queue<String>> connected = new 
HashMap<Integer, Queue<String>>() {{
+    put(8080, new ConcurrentLinkedQueue<String>());
+    put(8090, new ConcurrentLinkedQueue<String>());
+  }};
+
+  protected static final Map<Integer, Queue<Object>> eventsMap = new 
HashMap<Integer, Queue<Object>>() {{
+    put(8080, new ConcurrentLinkedQueue<>());
+    put(8090, new ConcurrentLinkedQueue<>());
+  }};
+}
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/MyTccEventServiceImpl.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/MyTccEventServiceImpl.java
new file mode 100644
index 00000000..bebe7b28
--- /dev/null
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/MyTccEventServiceImpl.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+
+import static org.junit.Assert.fail;
+
+import io.grpc.stub.StreamObserver;
+import java.lang.invoke.MethodHandles;
+import java.util.Queue;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinatedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent;
+import 
org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
+import 
org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MyTccEventServiceImpl extends 
TccEventServiceGrpc.TccEventServiceImplBase {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static final GrpcAck ALLOW = 
GrpcAck.newBuilder().setAborted(false).build();
+
+  private static final GrpcAck REJECT = 
GrpcAck.newBuilder().setAborted(true).build();
+
+  private final Queue<String> connected;
+  private final Queue<Object> events;
+  private final int delay;
+
+  public MyTccEventServiceImpl(Queue<String> connected, Queue<Object> events, 
int delay) {
+    this.connected = connected;
+    this.events = events;
+    this.delay = delay;
+  }
+
+
+  @Override
+  public void onConnected(GrpcServiceConfig request, 
StreamObserver<GrpcTccCoordinateCommand> responseObserver) {
+    connected.offer("Connected " + request.getServiceName());
+    sleep();
+    LOG.info("Established connection service [{}] instanceId [{}].", 
request.getServiceName(), request.getInstanceId());
+  }
+
+  @Override
+  public void onTccTransactionStarted(GrpcTccTransactionStartedEvent request,
+      StreamObserver<GrpcAck> responseObserver) {
+    LOG.info("Received transaction start event, global tx id: {}", 
request.getGlobalTxId());
+    events.offer(request);
+    sleep();
+    responseObserver.onNext(ALLOW);
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void participate(GrpcTccParticipatedEvent request, 
StreamObserver<GrpcAck> responseObserver) {
+    LOG.info("Received participated event from service {} , global tx id: {}, 
local tx id: {}",
+        request.getServiceName(),
+        request.getGlobalTxId(), request.getLocalTxId());
+    events.offer(request);
+    sleep();
+    responseObserver.onNext(ALLOW);
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void onTccTransactionEnded(GrpcTccTransactionEndedEvent request, 
StreamObserver<GrpcAck> responseObserver) {
+    LOG.info("Received transaction end event, global tx id: {}", 
request.getGlobalTxId());
+    events.offer(request);
+    sleep();
+    responseObserver.onNext(ALLOW);
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void onTccCoordinated(GrpcTccCoordinatedEvent request, 
StreamObserver<GrpcAck> responseObserver) {
+    LOG.info("Received coordinated event, global tx: {}, local tx: {}, parent 
id: {}, "
+            + "method: {}, status: {}, service [{}] instanceId [{}]",
+        request.getGlobalTxId(), request.getLocalTxId(), 
request.getParentTxId(),
+        request.getMethodName(), request.getStatus(), 
request.getServiceName(), request.getInstanceId());
+    events.offer(request);
+    sleep();
+    responseObserver.onNext(ALLOW);
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void onDisconnected(GrpcServiceConfig request, 
StreamObserver<GrpcAck> responseObserver) {
+    connected.add("Disconnected " + request.getServiceName());
+    sleep();
+    responseObserver.onNext(ALLOW);
+    responseObserver.onCompleted();
+  }
+
+  private void sleep() {
+    try {
+      Thread.sleep(delay);
+    } catch (InterruptedException e) {
+      fail(e.getMessage());
+    }
+  }
+}
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/SagaLoadBalanceSenderTest.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/SagaLoadBalanceSenderTest.java
new file mode 100644
index 00000000..cd8eab7e
--- /dev/null
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/SagaLoadBalanceSenderTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+
+import org.junit.After;
+import org.junit.Before;
+
+public class SagaLoadBalanceSenderTest {
+
+  @Before
+  public void setup() {
+
+  }
+
+  @After
+  public void teardown() {
+
+  }
+
+}
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TccLoadBalanceSenderTest.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TccLoadBalanceSenderTest.java
new file mode 100644
index 00000000..afec0439
--- /dev/null
+++ 
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TccLoadBalanceSenderTest.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.netty.NettyServerBuilder;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.omega.connector.grpc.AlphaClusterConfig;
+import org.apache.servicecomb.saga.omega.connector.grpc.FastestSender;
+import org.apache.servicecomb.saga.omega.context.ServiceConfig;
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import 
org.apache.servicecomb.saga.omega.transaction.tcc.CoordinateMessageHandler;
+import org.apache.servicecomb.saga.omega.transaction.tcc.TccMessageHandler;
+import 
org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
+import 
org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
+import 
org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent;
+import org.hamcrest.core.Is;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TccLoadBalanceSenderTest extends LoadBalanceSenderTestBase {
+  private final AlphaClusterConfig clusterConfig = 
mock(AlphaClusterConfig.class);
+  private final TccMessageHandler tccMessageHandler = 
mock(CoordinateMessageHandler.class);
+
+  protected final String[] addresses = {"localhost:8080", "localhost:8090"};
+
+  private LoadBalanceContext loadContext;
+  private TccLoadBalanceSender tccLoadBalanceSender;
+
+  private final String globalTxId = uniquify("globalTxId");
+  private final String localTxId = uniquify("localTxId");
+  private final String parentTxId = uniquify("parentTxId");
+  private final String methodName = uniquify("methodName");
+  private final String confirmMethod = uniquify("confirmMethod");
+  private final String cancelMethod = uniquify("cancleMethod");
+  private final String serviceName = uniquify("serviceName");
+
+  private final ServiceConfig serviceConfig = new ServiceConfig(serviceName);
+
+  private ParticipatedEvent participatedEvent;
+  private TccStartedEvent tccStartedEvent;
+  private TccEndedEvent tccEndedEvent;
+  private CoordinatedEvent coordinatedEvent;
+
+  @BeforeClass
+  public static void startServer() throws IOException {
+    for (Integer each : ports) {
+      startServerOnPort(each);
+    }
+  }
+
+  private static void startServerOnPort(int port) {
+    ServerBuilder<?> serverBuilder = NettyServerBuilder.forAddress(
+        new InetSocketAddress("127.0.0.1", port));
+    serverBuilder.addService(new MyTccEventServiceImpl(connected.get(port), 
eventsMap.get(port), delays.get(port)));
+    Server server = serverBuilder.build();
+
+    try {
+      server.start();
+      servers.put(port, server);
+    } catch (Exception ex) {
+      fail(ex.getMessage());
+    }
+  }
+
+  @Before
+  public void setup() {
+    
when(clusterConfig.getAddresses()).thenReturn(Lists.newArrayList(addresses));
+    when(clusterConfig.getTccMessageHandler()).thenReturn(tccMessageHandler);
+    when(clusterConfig.isEnableSSL()).thenReturn(false);
+
+    loadContext =
+        new LoadBalanceContextBuilder(TransactionType.TCC, clusterConfig, 
serviceConfig, 30).build();
+    tccLoadBalanceSender = new TccLoadBalanceSender(loadContext, new 
FastestSender());
+    participatedEvent = new ParticipatedEvent(globalTxId, localTxId, 
parentTxId, confirmMethod, cancelMethod, TransactionStatus.Succeed);
+    tccStartedEvent = new TccStartedEvent(globalTxId, localTxId);
+    tccEndedEvent = new TccEndedEvent(globalTxId, localTxId, 
TransactionStatus.Succeed);
+    coordinatedEvent = new CoordinatedEvent(globalTxId, localTxId, parentTxId, 
methodName, TransactionStatus.Succeed);
+  }
+
+  @After
+  public void teardown() {
+    tccLoadBalanceSender.close();
+
+    for (Queue<Object> queue :eventsMap.values()) {
+      queue.clear();
+    }
+    for (Queue<String> queue :connected.values()) {
+      queue.clear();
+    }
+  }
+
+  @Test
+  public void participatedSucceed() {
+    Iterator<Long> iterator = loadContext.getSenders().values().iterator();
+    assertThat(iterator.next(), is(0L));
+    assertThat(iterator.next(), is(0L));
+
+    Iterator<MessageSender> keySet = 
loadContext.getSenders().keySet().iterator();
+    loadContext.getSenders().put(keySet.next(), Long.MAX_VALUE);
+    TccMessageSender expectSender = (TccMessageSender) keySet.next();
+
+    // assert expected message sender
+    TccMessageSender actualSender = tccLoadBalanceSender.pickMessageSender();
+    assertThat(actualSender.target(), is(expectSender.target()));
+
+    AlphaResponse response = 
tccLoadBalanceSender.participate(participatedEvent);
+    assertThat(loadContext.getSenders().get(actualSender), greaterThan(0L));
+    assertThat(response.aborted(), is(false));
+
+    Integer expectPort = Integer.valueOf(expectSender.target().split(":")[1]);
+    GrpcTccParticipatedEvent result = (GrpcTccParticipatedEvent) 
eventsMap.get(expectPort).poll();
+    assertThat(result.getGlobalTxId(), is(globalTxId));
+    assertThat(result.getCancelMethod(), is(cancelMethod));
+    assertThat(result.getConfirmMethod(), is(confirmMethod));
+    assertThat(result.getServiceName(), is(serviceName));
+    assertThat(result.getInstanceId(), is(serviceConfig.instanceId()));
+    assertThat(result.getParentTxId(), is(parentTxId));
+    assertThat(result.getStatus(), is(TransactionStatus.Succeed.name()));
+  }
+
+  @Test
+  public void participateFailedThenRetry() {
+    tccLoadBalanceSender.onConnected();
+    await().atMost(2, TimeUnit.SECONDS).until(new Callable<Boolean>() {
+      @Override
+      public Boolean call() {
+        return connected.get(8080).size() == 1 && connected.get(8090).size() 
== 1;
+      }
+    });
+    assertThat((connected.get(8080).size() == 1 && connected.get(8090).size() 
== 1), is(true));
+
+    // due to 8090 is slow than 8080, so 8080 will be routed with 2 times.
+    tccLoadBalanceSender.participate(participatedEvent);
+    tccLoadBalanceSender.participate(participatedEvent);
+    tccLoadBalanceSender.participate(participatedEvent);
+    assertThat(eventsMap.get(8080).size(), is(2));
+    assertThat(eventsMap.get(8090).size(), is(1));
+
+    // when 8080 was shutdown, request will be routed to 8090 automatically.
+    servers.get(8080).shutdownNow();
+    tccLoadBalanceSender.participate(participatedEvent);
+    assertThat(eventsMap.get(8090).size(), is(2));
+
+    // when 8080 was recovery, it will be routed again.
+    startServerOnPort(8080);
+    await().atMost(2, TimeUnit.SECONDS).until(new Callable<Boolean>() {
+      @Override
+      public Boolean call() {
+        return connected.get(8080).size() == 3;
+      }
+    });
+    tccLoadBalanceSender.participate(participatedEvent);
+    assertThat(eventsMap.get(8080).size(), is(3));
+  }
+
+  @Test(expected = OmegaException.class)
+  public void participateFailedThenAbort() {
+    TccMessageSender failedSender = mock(GrpcTccClientMessageSender.class);
+    doThrow(new OmegaException("omega 
exception")).when(failedSender).participate((ParticipatedEvent)any());
+    TccMessageSender succeedSender = mock(GrpcTccClientMessageSender.class);
+    when(succeedSender.participate((ParticipatedEvent) any())).thenReturn(new 
AlphaResponse(false));
+
+    Map<MessageSender, Long> senders = Maps.newConcurrentMap();
+    senders.put(failedSender, 0l);
+    senders.put(succeedSender, 10l);
+    loadContext.setSenders(senders);
+    tccLoadBalanceSender.participate(participatedEvent);
+  }
+
+  @Test
+  public void participateInterruptedFailed() throws InterruptedException {
+    Thread thread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          await().atLeast(1, SECONDS);
+          tccLoadBalanceSender.participate(participatedEvent);
+        } catch (OmegaException e) {
+          assertThat(e.getMessage().endsWith("interruption"), Is.is(true));
+        }
+      }
+    });
+    thread.start();
+    thread.interrupt();
+    thread.join();
+  }
+
+  @Test
+  public void TccStartSucceed() {
+    TccMessageSender actualSender = tccLoadBalanceSender.pickMessageSender();
+    AlphaResponse response = 
tccLoadBalanceSender.tccTransactionStart(tccStartedEvent);
+    assertThat(loadContext.getSenders().get(actualSender), greaterThan(0L));
+    assertThat(response.aborted(), is(false));
+  }
+
+  @Test
+  public void TccEndSucceed() {
+    TccMessageSender actualSender = tccLoadBalanceSender.pickMessageSender();
+    AlphaResponse response = 
tccLoadBalanceSender.tccTransactionStop(tccEndedEvent);
+    assertThat(loadContext.getSenders().get(actualSender), greaterThan(0L));
+    assertThat(response.aborted(), is(false));
+  }
+
+  @Test
+  public void TccCoordinatedSucceed() {
+    TccMessageSender actualSender = tccLoadBalanceSender.pickMessageSender();
+    AlphaResponse response = tccLoadBalanceSender.coordinate(coordinatedEvent);
+    assertThat(loadContext.getSenders().get(actualSender), greaterThan(0L));
+    assertThat(response.aborted(), is(false));
+  }
+
+  @Test
+  public void broadcastConnectionAndDisconnection() {
+    tccLoadBalanceSender.onConnected();
+    await().atMost(1, SECONDS).until(new Callable<Boolean>() {
+
+      @Override
+      public Boolean call() throws Exception {
+        return !connected.get(8080).isEmpty() && 
!connected.get(8090).isEmpty();
+      }
+    });
+
+    Assert.assertThat(connected.get(8080), contains("Connected " + 
serviceName));
+    Assert.assertThat(connected.get(8090), contains("Connected " + 
serviceName));
+
+    tccLoadBalanceSender.onDisconnected();
+    Assert.assertThat(connected.get(8080), contains("Connected " + 
serviceName, "Disconnected " + serviceName));
+    Assert.assertThat(connected.get(8090), contains("Connected " + 
serviceName, "Disconnected " + serviceName));
+  }
+
+}
diff --git 
a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
 
b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 2229039f..614038b6 100644
--- 
a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ 
b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -21,8 +21,13 @@
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import org.apache.servicecomb.saga.omega.connector.grpc.AlphaClusterConfig;
+import org.apache.servicecomb.saga.omega.connector.grpc.FastestSender;
 import org.apache.servicecomb.saga.omega.connector.grpc.GrpcTccEventService;
 import 
org.apache.servicecomb.saga.omega.connector.grpc.LoadBalancedClusterMessageSender;
+import org.apache.servicecomb.saga.omega.connector.grpc.tcc.LoadBalanceContext;
+import 
org.apache.servicecomb.saga.omega.connector.grpc.tcc.LoadBalanceContextBuilder;
+import 
org.apache.servicecomb.saga.omega.connector.grpc.tcc.TccLoadBalanceSender;
+import org.apache.servicecomb.saga.omega.connector.grpc.tcc.TransactionType;
 import org.apache.servicecomb.saga.omega.context.CallbackContext;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
@@ -35,6 +40,7 @@
 import 
org.apache.servicecomb.saga.omega.transaction.tcc.DefaultParametersContext;
 import org.apache.servicecomb.saga.omega.transaction.tcc.ParametersContext;
 import org.apache.servicecomb.saga.omega.transaction.tcc.TccEventService;
+import org.apache.servicecomb.saga.omega.transaction.tcc.TccMessageHandler;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
@@ -75,16 +81,15 @@ ParametersContext parametersContext() {
   }
 
   @Bean
-  MessageSender grpcMessageSender(
+  AlphaClusterConfig alphaClusterConfig(
       @Value("${alpha.cluster.address:localhost:8080}") String[] addresses,
       @Value("${alpha.cluster.ssl.enable:false}") boolean enableSSL,
       @Value("${alpha.cluster.ssl.mutualAuth:false}") boolean mutualAuth,
       @Value("${alpha.cluster.ssl.cert:client.crt}") String cert,
       @Value("${alpha.cluster.ssl.key:client.pem}") String key,
       @Value("${alpha.cluster.ssl.certChain:ca.crt}") String certChain,
-      @Value("${omega.connection.reconnectDelay:3000}") int reconnectDelay,
-      ServiceConfig serviceConfig,
-      @Lazy MessageHandler handler) {
+      @Lazy MessageHandler handler,
+      @Lazy TccMessageHandler tccMessageHandler) {
 
     MessageFormat messageFormat = new KryoMessageFormat();
     AlphaClusterConfig clusterConfig = AlphaClusterConfig.builder()
@@ -97,10 +102,19 @@ MessageSender grpcMessageSender(
         .messageDeserializer(messageFormat)
         .messageSerializer(messageFormat)
         .messageHandler(handler)
+        .tccMessageHandler(tccMessageHandler)
         .build();
+    return clusterConfig;
+  }
+
+  @Bean(name = "sagaSender")
+  MessageSender grpcMessageSender(
+      AlphaClusterConfig alphaClusterConfig,
+      ServiceConfig serviceConfig,
+      @Value("${omega.connection.reconnectDelay:3000}") int reconnectDelay) {
 
     final MessageSender sender = new LoadBalancedClusterMessageSender(
-        clusterConfig,
+        alphaClusterConfig,
         serviceConfig,
         reconnectDelay);
 
@@ -116,10 +130,28 @@ public void run() {
     return sender;
   }
 
+  @Bean
+  LoadBalanceContext loadBalanceSenderContext(
+      AlphaClusterConfig alphaClusterConfig,
+      ServiceConfig serviceConfig,
+      @Value("${omega.connection.reconnectDelay:3000}") int reconnectDelay) {
+    LoadBalanceContext loadBalanceSenderContext = new 
LoadBalanceContextBuilder(
+        TransactionType.TCC,
+        alphaClusterConfig,
+        serviceConfig,
+        reconnectDelay).build();
+    return loadBalanceSenderContext;
+  }
+
+  @Bean(name = "tccSender")
+  TccLoadBalanceSender tccLoadBalanceSender(LoadBalanceContext 
loadBalanceSenderContext) {
+    return new TccLoadBalanceSender(loadBalanceSenderContext, new 
FastestSender());
+  }
+
   // TODO should integrate with loadBalance message sender in future.
   @Bean
   TccEventService tccEventService(ServiceConfig serviceConfig,
-      @Lazy org.apache.servicecomb.saga.omega.transaction.tcc.MessageHandler 
coordinateMessageHandler,
+      @Lazy TccMessageHandler coordinateMessageHandler,
       @Value("${alpha.cluster.address:localhost:8080}") String address) {
     ManagedChannel channel = 
ManagedChannelBuilder.forTarget(address).usePlaintext().build();
     final GrpcTccEventService service = new GrpcTccEventService(serviceConfig, 
channel, address, coordinateMessageHandler);
diff --git 
a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
 
b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
index f9a23d0d..5977b474 100644
--- 
a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
+++ 
b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
@@ -27,6 +27,7 @@
 import 
org.apache.servicecomb.saga.omega.transaction.tcc.CoordinateMessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.tcc.ParametersContext;
 import org.apache.servicecomb.saga.omega.transaction.tcc.TccEventService;
+import org.apache.servicecomb.saga.omega.transaction.tcc.TccMessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.tcc.TccParticipatorAspect;
 import org.apache.servicecomb.saga.omega.transaction.tcc.TccStartAspect;
 import org.springframework.beans.factory.annotation.Qualifier;
@@ -39,18 +40,18 @@
 public class TransactionAspectConfig {
 
   @Bean
-  MessageHandler messageHandler(MessageSender sender,
+  MessageHandler messageHandler(@Qualifier("sagaSender") MessageSender sender,
       @Qualifier("compensationContext") CallbackContext context, OmegaContext 
omegaContext) {
     return new CompensationMessageHandler(sender, context);
   }
 
   @Bean
-  SagaStartAspect sagaStartAspect(MessageSender sender, OmegaContext context) {
+  SagaStartAspect sagaStartAspect(@Qualifier("sagaSender")MessageSender 
sender, OmegaContext context) {
     return new SagaStartAspect(sender, context);
   }
 
   @Bean
-  TransactionAspect transactionAspect(MessageSender sender, OmegaContext 
context) {
+  TransactionAspect transactionAspect(@Qualifier("sagaSender")MessageSender 
sender, OmegaContext context) {
     return new TransactionAspect(sender, context);
   }
 
@@ -61,7 +62,7 @@ CompensableAnnotationProcessor 
compensableAnnotationProcessor(OmegaContext omega
   }
 
   @Bean
-  org.apache.servicecomb.saga.omega.transaction.tcc.MessageHandler 
coordinateMessageHandler(
+  TccMessageHandler coordinateMessageHandler(
       TccEventService tccEventService,
       @Qualifier("coordinateContext") CallbackContext coordinateContext,
       OmegaContext omegaContext,
diff --git 
a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MessageConfig.java
 
b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MessageConfig.java
index 1015f9b8..6230c592 100644
--- 
a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MessageConfig.java
+++ 
b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MessageConfig.java
@@ -70,7 +70,7 @@ OmegaContext omegaContext(IdGenerator<String> idGenerator) {
     return messages;
   }
 
-  @Bean
+  @Bean(name = "sagaSender")
   MessageSender sender() {
     return new MessageSender() {
       @Override
diff --git 
a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TccInterceptorTest.java
 
b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TccInterceptorTest.java
index 53cf36d0..bc9148b8 100644
--- 
a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TccInterceptorTest.java
+++ 
b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TccInterceptorTest.java
@@ -32,6 +32,7 @@
 import org.apache.servicecomb.saga.common.TransactionStatus;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.tcc.TccMessageHandler;
 import 
org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
 import 
org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
@@ -81,7 +82,7 @@
   private String cancelMethod;
 
   @Autowired
-  private org.apache.servicecomb.saga.omega.transaction.tcc.MessageHandler 
coordinateMessageHandler;
+  private TccMessageHandler coordinateMessageHandler;
 
   @Before
   public void setUp() throws Exception {
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandler.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandler.java
index 8d5e2730..17b6df81 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandler.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandler.java
@@ -22,7 +22,7 @@
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import 
org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
 
-public class CoordinateMessageHandler implements MessageHandler {
+public class CoordinateMessageHandler implements TccMessageHandler {
 
   private final TccEventService tccEventService;
 
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/MessageHandler.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccMessageHandler.java
similarity index 96%
rename from 
omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/MessageHandler.java
rename to 
omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccMessageHandler.java
index 87589548..461c310a 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/MessageHandler.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccMessageHandler.java
@@ -17,6 +17,6 @@
 
 package org.apache.servicecomb.saga.omega.transaction.tcc;
 
-public interface MessageHandler {
+public interface TccMessageHandler {
   void onReceive(String globalTxId, String localTxId, String parentTxId, 
String method);
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Add fault tolerance for service comb TCC
> ----------------------------------------
>
>                 Key: SCB-909
>                 URL: https://issues.apache.org/jira/browse/SCB-909
>             Project: Apache ServiceComb
>          Issue Type: New Feature
>          Components: Saga
>    Affects Versions: saga-0.3.0
>            Reporter: cherrylzhao
>            Assignee: cherrylzhao
>            Priority: Major
>             Fix For: saga-0.3.0
>
>
> TCC fault tolerance incude following point.
> # omega can switch to another available alpha when sending message failed.
> # if omega resend logic (different alpha) failed, omega can rollback local 
> data automatically.
> # alpha do resend logic (same omega) when ACK failed, if resend failed, dirty 
> data can left in database, this will be handled by scanner.
> # design transaction timeout mechanics, if scanner found event won't 
> completed within expected time, sending componsate command to omega, it has 
> different type compare with normal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to