[
https://issues.apache.org/jira/browse/SCB-909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16623709#comment-16623709
]
ASF GitHub Bot commented on SCB-909:
------------------------------------
WillemJiang closed pull request #312: SCB-909 Integrate TccLoadBalance into
omega-transaction
URL: https://github.com/apache/incubator-servicecomb-saga/pull/312
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/AlphaClusterConfig.java
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/AlphaClusterConfig.java
index 99bff3f8..43fb8fcb 100644
---
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/AlphaClusterConfig.java
+++
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/AlphaClusterConfig.java
@@ -19,7 +19,6 @@
import java.util.Collections;
import java.util.List;
-import org.apache.servicecomb.saga.omega.connector.grpc.tcc.TccMessageSender;
import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
diff --git
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventService.java
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventService.java
deleted file mode 100644
index c619584a..00000000
---
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventService.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.saga.omega.connector.grpc;
-
-import
org.apache.servicecomb.saga.omega.connector.grpc.tcc.GrpcCoordinateStreamObserver;
-import org.apache.servicecomb.saga.omega.context.ServiceConfig;
-import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
-import org.apache.servicecomb.saga.omega.transaction.tcc.TccMessageHandler;
-import org.apache.servicecomb.saga.omega.transaction.tcc.TccEventService;
-import
org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
-import
org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
-import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
-import
org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
-
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinatedEvent;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent;
-import
org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
-import
org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent;
-import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc;
-import
org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceBlockingStub;
-import
org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceStub;
-
-import io.grpc.ManagedChannel;
-
-public class GrpcTccEventService implements TccEventService {
- private final GrpcServiceConfig serviceConfig;
- private final String target;
- private final TccEventServiceBlockingStub tccBlockingEventService;
- private final TccEventServiceStub tccAsyncEventService;
- private final GrpcCoordinateStreamObserver observer;
-
- public GrpcTccEventService(ServiceConfig serviceConfig,
- ManagedChannel channel,
- String address,
- TccMessageHandler handler
- ) {
- this.target = address;
- tccBlockingEventService = TccEventServiceGrpc.newBlockingStub(channel);
- tccAsyncEventService = TccEventServiceGrpc.newStub(channel);
- this.serviceConfig = serviceConfig(serviceConfig.serviceName(),
serviceConfig.instanceId());
- observer = new GrpcCoordinateStreamObserver(null, null, handler);
- }
-
- @Override
- public void onConnected() {
- tccAsyncEventService.onConnected(serviceConfig, observer);
- }
-
- @Override
- public void onDisconnected() {
- tccBlockingEventService.onDisconnected(serviceConfig);
- }
-
- @Override
- public void close() {
- // do nothing here
- }
-
- @Override
- public String target() {
- return target;
- }
-
- @Override
- public AlphaResponse participate(ParticipatedEvent participateEvent) {
- GrpcAck grpcAck =
tccBlockingEventService.participate(convertTo(participateEvent));
- return new AlphaResponse(grpcAck.getAborted());
- }
-
- @Override
- public AlphaResponse tccTransactionStart(TccStartedEvent tccStartEvent) {
- GrpcAck grpcAck =
tccBlockingEventService.onTccTransactionStarted(convertTo(tccStartEvent));
- return new AlphaResponse(grpcAck.getAborted());
- }
-
-
- @Override
- public AlphaResponse tccTransactionStop(TccEndedEvent tccEndEvent) {
- GrpcAck grpcAck =
tccBlockingEventService.onTccTransactionEnded(convertTo(tccEndEvent));
- return new AlphaResponse(grpcAck.getAborted());
-
- }
-
- @Override
- public AlphaResponse coordinate(CoordinatedEvent coordinatedEvent) {
- GrpcAck grpcAck =
tccBlockingEventService.onTccCoordinated(convertTo(coordinatedEvent));
- return new AlphaResponse(grpcAck.getAborted());
- }
-
- private GrpcTccCoordinatedEvent convertTo(CoordinatedEvent coordinatedEvent)
{
- return GrpcTccCoordinatedEvent.newBuilder()
- .setServiceName(serviceConfig.getServiceName())
- .setInstanceId(serviceConfig.getInstanceId())
- .setGlobalTxId(coordinatedEvent.getGlobalTxId())
- .setLocalTxId(coordinatedEvent.getLocalTxId())
- .setParentTxId(coordinatedEvent.getParentTxId())
- .setMethodName(coordinatedEvent.getMethodName())
- .setStatus(coordinatedEvent.getStatus().toString())
- .build();
- }
-
- private GrpcServiceConfig serviceConfig(String serviceName, String
instanceId) {
- return GrpcServiceConfig.newBuilder()
- .setServiceName(serviceName)
- .setInstanceId(instanceId)
- .build();
- }
-
- private GrpcTccTransactionStartedEvent convertTo(TccStartedEvent
tccStartEvent) {
- return GrpcTccTransactionStartedEvent.newBuilder()
- .setServiceName(serviceConfig.getServiceName())
- .setInstanceId(serviceConfig.getInstanceId())
- .setGlobalTxId(tccStartEvent.getGlobalTxId())
- .setLocalTxId(tccStartEvent.getLocalTxId())
- .build();
- }
-
- private GrpcTccTransactionEndedEvent convertTo(TccEndedEvent tccEndEvent) {
- return GrpcTccTransactionEndedEvent.newBuilder()
- .setServiceName(serviceConfig.getServiceName())
- .setInstanceId(serviceConfig.getInstanceId())
- .setGlobalTxId(tccEndEvent.getGlobalTxId())
- .setLocalTxId(tccEndEvent.getLocalTxId())
- .setStatus(tccEndEvent.getStatus().toString())
- .build();
- }
-
- private GrpcTccParticipatedEvent convertTo(ParticipatedEvent
participateEvent) {
- return GrpcTccParticipatedEvent.newBuilder()
- .setServiceName(serviceConfig.getServiceName())
- .setInstanceId(serviceConfig.getInstanceId())
- .setGlobalTxId(participateEvent.getGlobalTxId())
- .setLocalTxId(participateEvent.getLocalTxId())
- .setParentTxId(participateEvent.getParentTxId())
- .setCancelMethod(participateEvent.getCancelMethod())
- .setConfirmMethod(participateEvent.getConfirmMethod())
- .setStatus(participateEvent.getStatus().toString())
- .build();
- }
-}
diff --git
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
deleted file mode 100644
index c3507555..00000000
---
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.saga.omega.connector.grpc;
-
-import static java.util.Collections.emptyList;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
-import com.google.common.base.Supplier;
-import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import io.grpc.netty.GrpcSslContexts;
-import io.grpc.netty.NegotiationType;
-import io.grpc.netty.NettyChannelBuilder;
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.SslContextBuilder;
-import io.netty.handler.ssl.SslProvider;
-import java.io.File;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import javax.net.ssl.SSLException;
-import org.apache.servicecomb.saga.omega.context.ServiceConfig;
-import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
-import org.apache.servicecomb.saga.omega.transaction.MessageSender;
-import org.apache.servicecomb.saga.omega.transaction.OmegaException;
-import org.apache.servicecomb.saga.omega.transaction.TxEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LoadBalancedClusterMessageSender implements MessageSender {
-
- private static final Logger LOG =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final Map<MessageSender, Long> senders = new ConcurrentHashMap<>();
- private final Collection<ManagedChannel> channels;
-
- private final BlockingQueue<Runnable> pendingTasks = new
LinkedBlockingQueue<>();
- private final BlockingQueue<MessageSender> availableMessageSenders = new
LinkedBlockingQueue<>();
- private final MessageSender retryableMessageSender = new
RetryableMessageSender(
- availableMessageSenders);
-
- private final Supplier<MessageSender> defaultMessageSender = new
Supplier<MessageSender>() {
- @Override
- public MessageSender get() {
- return retryableMessageSender;
- }
- };
-
- private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
-
- public LoadBalancedClusterMessageSender(AlphaClusterConfig clusterConfig,
- ServiceConfig serviceConfig,
- int reconnectDelay) {
-
- if (clusterConfig.getAddresses().isEmpty()) {
- throw new IllegalArgumentException("No reachable cluster address
provided");
- }
-
- channels = new ArrayList<>(clusterConfig.getAddresses().size());
-
- SslContext sslContext = null;
- for (String address : clusterConfig.getAddresses()) {
- ManagedChannel channel;
-
- if (clusterConfig.isEnableSSL()) {
- if (sslContext == null) {
- try {
- sslContext = buildSslContext(clusterConfig);
- } catch (SSLException e) {
- throw new IllegalArgumentException("Unable to build SslContext",
e);
- }
- }
- channel = NettyChannelBuilder.forTarget(address)
- .negotiationType(NegotiationType.TLS)
- .sslContext(sslContext)
- .build();
- } else {
- channel = ManagedChannelBuilder.forTarget(address).usePlaintext()
- .build();
- }
- channels.add(channel);
- senders.put(
- new GrpcClientMessageSender(
- address,
- channel,
- clusterConfig.getMessageSerializer(),
- clusterConfig.getMessageDeserializer(),
- serviceConfig,
- new ErrorHandlerFactory(),
- clusterConfig.getMessageHandler()),
- 0L);
- }
-
- scheduleReconnectTask(reconnectDelay);
- }
-
- // this is for test only
- LoadBalancedClusterMessageSender(MessageSender... messageSenders) {
- for (MessageSender sender : messageSenders) {
- senders.put(sender, 0L);
- }
- channels = emptyList();
- }
-
- @Override
- public void onConnected() {
- for(MessageSender sender :senders.keySet()){
- try {
- sender.onConnected();
- } catch (Exception e) {
- LOG.error("Failed connecting to alpha at {}", sender.target(), e);
- }
- }
- }
-
- @Override
- public void onDisconnected() {
- for (MessageSender sender :senders.keySet()) {
- try {
- sender.onDisconnected();
- } catch (Exception e) {
- LOG.error("Failed disconnecting from alpha at {}", sender.target(), e);
- }
- }
- }
-
- @Override
- public void close() {
- scheduler.shutdown();
- for(ManagedChannel channel : channels) {
- channel.shutdownNow();
- }
- }
-
- @Override
- public String target() {
- return "UNKNOWN";
- }
-
- @Override
- public AlphaResponse send(TxEvent event) {
- return send(event, new FastestSender());
- }
-
- AlphaResponse send(TxEvent event, MessageSenderPicker messageSenderPicker) {
- do {
- MessageSender messageSender = messageSenderPicker.pick(senders,
defaultMessageSender);
-
- try {
- long startTime = System.nanoTime();
- AlphaResponse response = messageSender.send(event);
- senders.put(messageSender, System.nanoTime() - startTime);
-
- return response;
- } catch (OmegaException e) {
- throw e;
- } catch (Exception e) {
- LOG.error("Retry sending event {} due to failure", event, e);
-
- // very large latency on exception
- senders.put(messageSender, Long.MAX_VALUE);
- }
- } while (!Thread.currentThread().isInterrupted());
-
- throw new OmegaException("Failed to send event " + event + " due to
interruption");
- }
-
- private void scheduleReconnectTask(int reconnectDelay) {
- scheduler.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- try {
- pendingTasks.take().run();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }, 0, reconnectDelay, MILLISECONDS);
- }
-
- class ErrorHandlerFactory {
- Runnable getHandler(MessageSender messageSender) {
- final Runnable runnable = new PushBackReconnectRunnable(messageSender,
senders, pendingTasks,
- availableMessageSenders);
- return new Runnable() {
- @Override
- public void run() {
- pendingTasks.offer(runnable);
- }
- };
- }
-
- }
-
- private static SslContext buildSslContext(AlphaClusterConfig clusterConfig)
throws SSLException {
- SslContextBuilder builder = GrpcSslContexts.forClient();
- // openssl must be used because some older JDk does not support cipher
suites required by http2,
- // and the performance of JDK ssl is pretty low compared to openssl.
- builder.sslProvider(SslProvider.OPENSSL);
-
- Properties prop = new Properties();
- try {
-
prop.load(LoadBalancedClusterMessageSender.class.getClassLoader().getResourceAsStream("ssl.properties"));
- } catch (IOException e) {
- throw new IllegalArgumentException("Unable to read ssl.properties.", e);
- }
-
- builder.protocols(prop.getProperty("protocols").split(","));
- builder.ciphers(Arrays.asList(prop.getProperty("ciphers").split(",")));
- builder.trustManager(new File(clusterConfig.getCertChain()));
-
- if (clusterConfig.isEnableMutualAuth()) {
- builder.keyManager(new File(clusterConfig.getCert()), new
File(clusterConfig.getKey()));
- }
-
- return builder.build();
- }
-}
-
diff --git
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/FastestSender.java
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/FastestSender.java
similarity index 61%
rename from
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/FastestSender.java
rename to
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/FastestSender.java
index ed758402..486c1b7d 100644
---
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/FastestSender.java
+++
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/FastestSender.java
@@ -6,16 +6,16 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-package org.apache.servicecomb.saga.omega.connector.grpc;
+package org.apache.servicecomb.saga.omega.connector.grpc.core;
import com.google.common.base.Supplier;
import java.util.Map;
@@ -25,13 +25,11 @@
* The strategy of picking the fastest {@link MessageSender}
*/
public class FastestSender implements MessageSenderPicker {
-
@Override
- public MessageSender pick(Map<MessageSender, Long> messageSenders,
- Supplier<MessageSender> defaultSender) {
+ public MessageSender pick(Map<? extends MessageSender, Long> messageSenders,
Supplier<MessageSender> defaultSender) {
Long min = Long.MAX_VALUE;
MessageSender sender = null;
- for (Map.Entry<MessageSender, Long> entry : messageSenders.entrySet()) {
+ for (Map.Entry<? extends MessageSender, Long> entry :
messageSenders.entrySet()) {
if (entry.getValue() != Long.MAX_VALUE && min > entry.getValue()) {
min = entry.getValue();
sender = entry.getKey();
diff --git
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/GrpcOnErrorHandler.java
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/GrpcOnErrorHandler.java
similarity index 73%
rename from
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/GrpcOnErrorHandler.java
rename to
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/GrpcOnErrorHandler.java
index 8e9b26f5..7ab02a15 100644
---
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/GrpcOnErrorHandler.java
+++
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/GrpcOnErrorHandler.java
@@ -6,24 +6,23 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+package org.apache.servicecomb.saga.omega.connector.grpc.core;
import com.google.common.base.Supplier;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import
org.apache.servicecomb.saga.omega.connector.grpc.PushBackReconnectRunnable;
-import org.apache.servicecomb.saga.omega.connector.grpc.RetryableMessageSender;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
public class GrpcOnErrorHandler {
@@ -61,12 +60,14 @@ public GrpcRetryContext getGrpcRetryContext() {
private final BlockingQueue<MessageSender> reconnectedSenders = new
LinkedBlockingQueue<>();
- private final MessageSender retryMessageSender = new
RetryableMessageSender(reconnectedSenders);
-
private final Supplier<MessageSender> defaultMessageSender = new
Supplier<MessageSender>() {
@Override
public MessageSender get() {
- return retryMessageSender;
+ try {
+ return reconnectedSenders.take();
+ } catch (InterruptedException e) {
+ throw new OmegaException("Failed to get reconnected sender", e);
+ }
}
};
diff --git
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceContext.java
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/LoadBalanceContext.java
similarity index 78%
rename from
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceContext.java
rename to
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/LoadBalanceContext.java
index 3831a921..2918be85 100644
---
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceContext.java
+++
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/LoadBalanceContext.java
@@ -6,16 +6,16 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+package org.apache.servicecomb.saga.omega.connector.grpc.core;
import io.grpc.ManagedChannel;
import java.util.Collection;
@@ -57,6 +57,7 @@ public GrpcOnErrorHandler getGrpcOnErrorHandler() {
return grpcOnErrorHandler;
}
+ // this is only for test
public void setSenders(Map<MessageSender, Long> senders) {
this.senders = senders;
}
diff --git
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceContextBuilder.java
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/LoadBalanceContextBuilder.java
similarity index 83%
rename from
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceContextBuilder.java
rename to
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/LoadBalanceContextBuilder.java
index 0b22ab60..3f3d542a 100644
---
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceContextBuilder.java
+++
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/LoadBalanceContextBuilder.java
@@ -6,16 +6,16 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+package org.apache.servicecomb.saga.omega.connector.grpc.core;
import com.google.common.base.Optional;
import io.grpc.ManagedChannel;
@@ -36,6 +36,8 @@
import java.util.concurrent.ConcurrentHashMap;
import javax.net.ssl.SSLException;
import org.apache.servicecomb.saga.omega.connector.grpc.AlphaClusterConfig;
+import
org.apache.servicecomb.saga.omega.connector.grpc.saga.GrpcSagaClientMessageSender;
+import
org.apache.servicecomb.saga.omega.connector.grpc.tcc.GrpcTccClientMessageSender;
import org.apache.servicecomb.saga.omega.context.ServiceConfig;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
@@ -58,7 +60,6 @@ public LoadBalanceContextBuilder(TransactionType
transactionType,
}
public LoadBalanceContext build() {
-
if (clusterConfig.getAddresses().isEmpty()) {
throw new IllegalArgumentException("No reachable cluster address
provided");
}
@@ -101,16 +102,16 @@ private MessageSender buildSender(
address,
clusterConfig.getTccMessageHandler(),
loadContext);
-
case SAGA:
-// return new GrpcClientMessageSender(
-// address,
-// channel,
-// clusterConfig.getMessageSerializer(),
-// clusterConfig.getMessageDeserializer(),
-// serviceConfig,
-// new LoadBalancedClusterMessageSender()ErrorHandlerFactory(),
-// clusterConfig.getMessageHandler());
+ return new GrpcSagaClientMessageSender(
+ address,
+ channel,
+ clusterConfig.getMessageSerializer(),
+ clusterConfig.getMessageDeserializer(),
+ serviceConfig,
+ clusterConfig.getMessageHandler(),
+ loadContext
+ );
default:
}
return null;
diff --git
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceSenderAdapter.java
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/LoadBalanceSenderAdapter.java
similarity index 80%
rename from
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceSenderAdapter.java
rename to
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/LoadBalanceSenderAdapter.java
index b7a8ef2c..424dbbe9 100644
---
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceSenderAdapter.java
+++
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/LoadBalanceSenderAdapter.java
@@ -6,25 +6,23 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+package org.apache.servicecomb.saga.omega.connector.grpc.core;
import com.google.common.base.Optional;
import io.grpc.ManagedChannel;
import java.lang.invoke.MethodHandles;
-import org.apache.servicecomb.saga.omega.connector.grpc.MessageSenderPicker;
import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.apache.servicecomb.saga.omega.transaction.OmegaException;
-import org.apache.servicecomb.saga.omega.transaction.TxEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,7 +58,6 @@ public LoadBalanceSenderAdapter(
} catch (Exception e) {
LOG.error("Retry sending event {} due to failure", event, e);
loadContext.getSenders().put(messageSender, Long.MAX_VALUE);
-// loadContext.getGrpcOnErrorHandler().handle(messageSender);
}
return Optional.fromNullable(response);
}
@@ -100,12 +97,11 @@ public String target() {
return "UNKNOWN";
}
- @Override
- public AlphaResponse send(TxEvent event) {
- return null;
- }
-
public MessageSenderPicker getSenderPicker() {
return senderPicker;
}
+
+ public LoadBalanceContext getLoadContext() {
+ return loadContext;
+ }
}
diff --git
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/MessageSenderPicker.java
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/MessageSenderPicker.java
similarity index 91%
rename from
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/MessageSenderPicker.java
rename to
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/MessageSenderPicker.java
index 25a88f17..c4b669de 100644
---
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/MessageSenderPicker.java
+++
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/MessageSenderPicker.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.saga.omega.connector.grpc;
+package org.apache.servicecomb.saga.omega.connector.grpc.core;
import com.google.common.base.Supplier;
import java.util.Collection;
@@ -36,6 +36,6 @@
* @param defaultSender Default sender provider
* @return The specified one.
*/
- MessageSender pick(Map<MessageSender, Long> messageSenders,
+ MessageSender pick(Map<? extends MessageSender, Long> messageSenders,
Supplier<MessageSender> defaultSender);
}
diff --git
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/PendingTaskRunner.java
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/PendingTaskRunner.java
similarity index 78%
rename from
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/PendingTaskRunner.java
rename to
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/PendingTaskRunner.java
index dfbfbd87..9bb7c959 100644
---
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/PendingTaskRunner.java
+++
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/PendingTaskRunner.java
@@ -6,16 +6,16 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+package org.apache.servicecomb.saga.omega.connector.grpc.core;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
diff --git
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/PushBackReconnectRunnable.java
similarity index 97%
rename from
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
rename to
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/PushBackReconnectRunnable.java
index 6faffcb2..bc3c1b39 100644
---
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
+++
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/PushBackReconnectRunnable.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.saga.omega.connector.grpc;
+package org.apache.servicecomb.saga.omega.connector.grpc.core;
import java.lang.invoke.MethodHandles;
import java.util.Map;
diff --git
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/ReconnectStreamObserver.java
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/ReconnectStreamObserver.java
similarity index 72%
rename from
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/ReconnectStreamObserver.java
rename to
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/ReconnectStreamObserver.java
index 938656fa..bc6971d9 100644
---
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/ReconnectStreamObserver.java
+++
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/ReconnectStreamObserver.java
@@ -6,19 +6,20 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+package org.apache.servicecomb.saga.omega.connector.grpc.core;
import io.grpc.stub.StreamObserver;
import java.lang.invoke.MethodHandles;
+import
org.apache.servicecomb.saga.omega.connector.grpc.core.LoadBalanceContext;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/SenderExecutor.java
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/SenderExecutor.java
similarity index 56%
rename from
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/SenderExecutor.java
rename to
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/SenderExecutor.java
index fb31eb5c..ed0a03f1 100644
---
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/SenderExecutor.java
+++
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/SenderExecutor.java
@@ -6,16 +6,16 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+package org.apache.servicecomb.saga.omega.connector.grpc.core;
import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
diff --git
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/TransactionType.java
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/TransactionType.java
new file mode 100644
index 00000000..236bb533
--- /dev/null
+++
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/core/TransactionType.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc.core;
+
+public enum TransactionType {
+ TCC, SAGA
+}
diff --git
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/saga/GrpcCompensateStreamObserver.java
similarity index 76%
rename from
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
rename to
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/saga/GrpcCompensateStreamObserver.java
index 6aae96ac..cdaec108 100644
---
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
+++
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/saga/GrpcCompensateStreamObserver.java
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,36 +13,37 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
- *
*/
-package org.apache.servicecomb.saga.omega.connector.grpc;
+package org.apache.servicecomb.saga.omega.connector.grpc.saga;
import java.lang.invoke.MethodHandles;
+import
org.apache.servicecomb.saga.omega.connector.grpc.core.LoadBalanceContext;
+import
org.apache.servicecomb.saga.omega.connector.grpc.core.ReconnectStreamObserver;
import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.grpc.stub.StreamObserver;
-
-class GrpcCompensateStreamObserver implements
StreamObserver<GrpcCompensateCommand> {
+class GrpcCompensateStreamObserver extends
ReconnectStreamObserver<GrpcCompensateCommand> {
private static final Logger LOG =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final MessageHandler messageHandler;
- private final Runnable errorHandler;
- private final MessageDeserializer deserializer;
-
- GrpcCompensateStreamObserver(MessageHandler messageHandler, Runnable
errorHandler, MessageDeserializer deserializer) {
+ public GrpcCompensateStreamObserver(LoadBalanceContext loadContext,
+ MessageSender messageSender,
+ MessageHandler messageHandler, MessageDeserializer deserializer) {
+ super(loadContext, messageSender);
this.messageHandler = messageHandler;
- this.errorHandler = errorHandler;
this.deserializer = deserializer;
}
+ private final MessageHandler messageHandler;
+ private final MessageDeserializer deserializer;
+
+
@Override
public void onNext(GrpcCompensateCommand command) {
LOG.info("Received compensate command, global tx id: {}, local tx id: {},
compensation method: {}",
@@ -56,15 +56,4 @@ public void onNext(GrpcCompensateCommand command) {
command.getCompensationMethod(),
deserializer.deserialize(command.getPayloads().toByteArray()));
}
-
- @Override
- public void onError(Throwable t) {
- LOG.error("Failed to process grpc compensate command.", t);
- errorHandler.run();
- }
-
- @Override
- public void onCompleted() {
- // Do nothing here
- }
}
diff --git
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java
similarity index 87%
rename from
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
rename to
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java
index 13da6cf2..57709200 100644
---
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -7,26 +6,26 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
- *
*/
-package org.apache.servicecomb.saga.omega.connector.grpc;
+package org.apache.servicecomb.saga.omega.connector.grpc.saga;
-import
org.apache.servicecomb.saga.omega.connector.grpc.LoadBalancedClusterMessageSender.ErrorHandlerFactory;
+import com.google.protobuf.ByteString;
+import io.grpc.ManagedChannel;
+import
org.apache.servicecomb.saga.omega.connector.grpc.core.LoadBalanceContext;
import org.apache.servicecomb.saga.omega.context.ServiceConfig;
import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
-import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
+import org.apache.servicecomb.saga.omega.transaction.SagaMessageSender;
import org.apache.servicecomb.saga.omega.transaction.TxEvent;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
@@ -36,12 +35,9 @@
import
org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
import
org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub;
-import com.google.protobuf.ByteString;
-
-import io.grpc.ManagedChannel;
-
-public class GrpcClientMessageSender implements MessageSender {
+public class GrpcSagaClientMessageSender implements SagaMessageSender {
private final String target;
+
private final TxEventServiceStub asyncEventService;
private final MessageSerializer serializer;
@@ -49,23 +45,23 @@
private final TxEventServiceBlockingStub blockingEventService;
private final GrpcCompensateStreamObserver compensateStreamObserver;
+
private final GrpcServiceConfig serviceConfig;
- public GrpcClientMessageSender(
+ public GrpcSagaClientMessageSender(
String address,
ManagedChannel channel,
MessageSerializer serializer,
MessageDeserializer deserializer,
ServiceConfig serviceConfig,
- ErrorHandlerFactory errorHandlerFactory,
- MessageHandler handler) {
+ MessageHandler handler,
+ LoadBalanceContext loadContext) {
this.target = address;
this.asyncEventService = TxEventServiceGrpc.newStub(channel);
this.blockingEventService = TxEventServiceGrpc.newBlockingStub(channel);
this.serializer = serializer;
-
this.compensateStreamObserver =
- new GrpcCompensateStreamObserver(handler,
errorHandlerFactory.getHandler(this), deserializer);
+ new GrpcCompensateStreamObserver(loadContext, this, handler,
deserializer);
this.serviceConfig = serviceConfig(serviceConfig.serviceName(),
serviceConfig.instanceId());
}
diff --git
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSender.java
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/saga/RetryableMessageSender.java
similarity index 87%
rename from
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSender.java
rename to
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/saga/RetryableMessageSender.java
index 4eb90e1b..6869dae6 100644
---
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSender.java
+++
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/saga/RetryableMessageSender.java
@@ -15,18 +15,18 @@
* limitations under the License.
*/
-package org.apache.servicecomb.saga.omega.connector.grpc;
+package org.apache.servicecomb.saga.omega.connector.grpc.saga;
import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent;
import java.util.concurrent.BlockingQueue;
-
import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import org.apache.servicecomb.saga.omega.transaction.SagaMessageSender;
import org.apache.servicecomb.saga.omega.transaction.TxEvent;
-public class RetryableMessageSender implements MessageSender {
+public class RetryableMessageSender implements SagaMessageSender {
private final BlockingQueue<MessageSender> availableMessageSenders;
public RetryableMessageSender(BlockingQueue<MessageSender>
availableMessageSenders) {
@@ -59,7 +59,7 @@ public AlphaResponse send(TxEvent event) {
throw new OmegaException("Failed to process subsequent requests because
no alpha server is available");
}
try {
- return availableMessageSenders.take().send(event);
+ return ((SagaMessageSender)availableMessageSenders.take()).send(event);
} catch (InterruptedException e) {
throw new OmegaException("Failed to send event " + event + " due to
interruption", e);
}
diff --git
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/SagaLoadBalanceSender.java
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/saga/SagaLoadBalanceSender.java
similarity index 59%
rename from
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/SagaLoadBalanceSender.java
rename to
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/saga/SagaLoadBalanceSender.java
index c3947913..df2a3671 100644
---
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/SagaLoadBalanceSender.java
+++
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/saga/SagaLoadBalanceSender.java
@@ -6,25 +6,28 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+package org.apache.servicecomb.saga.omega.connector.grpc.saga;
import com.google.common.base.Optional;
-import org.apache.servicecomb.saga.omega.connector.grpc.MessageSenderPicker;
+import
org.apache.servicecomb.saga.omega.connector.grpc.core.MessageSenderPicker;
+import
org.apache.servicecomb.saga.omega.connector.grpc.core.LoadBalanceContext;
+import
org.apache.servicecomb.saga.omega.connector.grpc.core.LoadBalanceSenderAdapter;
+import org.apache.servicecomb.saga.omega.connector.grpc.core.SenderExecutor;
import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
-import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import org.apache.servicecomb.saga.omega.transaction.SagaMessageSender;
import org.apache.servicecomb.saga.omega.transaction.TxEvent;
-public class SagaLoadBalanceSender extends LoadBalanceSenderAdapter {
+public class SagaLoadBalanceSender extends LoadBalanceSenderAdapter implements
SagaMessageSender {
public SagaLoadBalanceSender(LoadBalanceContext loadContext,
MessageSenderPicker senderPicker) {
@@ -34,7 +37,7 @@ public SagaLoadBalanceSender(LoadBalanceContext loadContext,
@Override
public AlphaResponse send(TxEvent event) {
do {
- final MessageSender messageSender = pickMessageSender();
+ final SagaMessageSender messageSender = pickMessageSender();
Optional<AlphaResponse> response = doGrpcSend(messageSender, event, new
SenderExecutor<TxEvent>() {
@Override
public AlphaResponse apply(TxEvent event) {
diff --git
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/GrpcCoordinateStreamObserver.java
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/GrpcCoordinateStreamObserver.java
index d69930c3..e2aec97c 100644
---
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/GrpcCoordinateStreamObserver.java
+++
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/GrpcCoordinateStreamObserver.java
@@ -6,18 +6,20 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
import java.lang.invoke.MethodHandles;
+import
org.apache.servicecomb.saga.omega.connector.grpc.core.LoadBalanceContext;
+import
org.apache.servicecomb.saga.omega.connector.grpc.core.ReconnectStreamObserver;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.apache.servicecomb.saga.omega.transaction.tcc.TccMessageHandler;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
diff --git
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/GrpcTccClientMessageSender.java
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/GrpcTccClientMessageSender.java
index e23f366b..c0020f0b 100644
---
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/GrpcTccClientMessageSender.java
+++
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/GrpcTccClientMessageSender.java
@@ -6,22 +6,23 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
import io.grpc.ManagedChannel;
+import
org.apache.servicecomb.saga.omega.connector.grpc.core.LoadBalanceContext;
import org.apache.servicecomb.saga.omega.context.ServiceConfig;
import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
-import org.apache.servicecomb.saga.omega.transaction.TxEvent;
import org.apache.servicecomb.saga.omega.transaction.tcc.TccMessageHandler;
+import org.apache.servicecomb.saga.omega.transaction.tcc.TccMessageSender;
import
org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
import
org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
@@ -152,9 +153,4 @@ private GrpcTccParticipatedEvent
convertTo(ParticipatedEvent participateEvent) {
.setStatus(participateEvent.getStatus().toString())
.build();
}
-
- @Override
- public AlphaResponse send(TxEvent event) {
- return null;
- }
}
diff --git
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TccLoadBalanceSender.java
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TccLoadBalanceSender.java
index fe20337d..a1e762c2 100644
---
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TccLoadBalanceSender.java
+++
b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TccLoadBalanceSender.java
@@ -6,21 +6,25 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
import com.google.common.base.Optional;
-import org.apache.servicecomb.saga.omega.connector.grpc.MessageSenderPicker;
+import
org.apache.servicecomb.saga.omega.connector.grpc.core.LoadBalanceContext;
+import
org.apache.servicecomb.saga.omega.connector.grpc.core.LoadBalanceSenderAdapter;
+import
org.apache.servicecomb.saga.omega.connector.grpc.core.MessageSenderPicker;
+import org.apache.servicecomb.saga.omega.connector.grpc.core.SenderExecutor;
import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import org.apache.servicecomb.saga.omega.transaction.tcc.TccMessageSender;
import
org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
import
org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
diff --git
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventServiceTest.java
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccClientMessageSenderTest.java
similarity index 97%
rename from
omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventServiceTest.java
rename to
omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccClientMessageSenderTest.java
index 51e17aff..931e2759 100644
---
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventServiceTest.java
+++
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccClientMessageSenderTest.java
@@ -24,6 +24,7 @@
import static org.mockito.Mockito.verify;
import org.apache.servicecomb.saga.common.TransactionStatus;
+import
org.apache.servicecomb.saga.omega.connector.grpc.tcc.GrpcTccClientMessageSender;
import org.apache.servicecomb.saga.omega.context.ServiceConfig;
import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
import org.apache.servicecomb.saga.omega.transaction.tcc.TccMessageHandler;
@@ -53,7 +54,7 @@
import io.grpc.util.MutableHandlerRegistry;
@RunWith(JUnit4.class)
-public class GrpcTccEventServiceTest {
+public class GrpcTccClientMessageSenderTest {
@Rule
public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
private final MutableHandlerRegistry serviceRegistry = new
MutableHandlerRegistry();
@@ -70,7 +71,7 @@
private final ServiceConfig serviceConfig = new
ServiceConfig(uniquify("Service"));
private final String address = uniquify("Address");
private final TccMessageHandler handler = mock(TccMessageHandler.class);
- private GrpcTccEventService service;
+ private GrpcTccClientMessageSender service;
@Before
public void setUp() throws Exception {
@@ -86,7 +87,7 @@ public void setUp() throws Exception {
InProcessChannelBuilder.forName(serverName).directExecutor().build());
// Create a TccEventServiceStub using the in-process channel;
- service = new GrpcTccEventService(serviceConfig, channel, address,
handler);
+ service = new GrpcTccClientMessageSender(serviceConfig, channel, address,
handler, null);
}
@Test
diff --git
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalanceClusterMessageSenderWithTLSTest.java
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalanceClusterMessageSenderWithTLSTest.java
index 6de7c01a..c9d0e02e 100644
---
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalanceClusterMessageSenderWithTLSTest.java
+++
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalanceClusterMessageSenderWithTLSTest.java
@@ -38,15 +38,19 @@
import java.util.Arrays;
import java.util.concurrent.Callable;
import javax.net.ssl.SSLException;
+import org.apache.servicecomb.saga.omega.connector.grpc.core.FastestSender;
+import
org.apache.servicecomb.saga.omega.connector.grpc.core.LoadBalanceContext;
+import
org.apache.servicecomb.saga.omega.connector.grpc.core.LoadBalanceContextBuilder;
+import org.apache.servicecomb.saga.omega.connector.grpc.core.TransactionType;
+import
org.apache.servicecomb.saga.omega.connector.grpc.saga.SagaLoadBalanceSender;
import org.apache.servicecomb.saga.omega.context.ServiceConfig;
-import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.junit.BeforeClass;
import org.junit.Test;
public class LoadBalanceClusterMessageSenderWithTLSTest extends
LoadBalancedClusterMessageSenderTestBase {
@Override
- protected MessageSender newMessageSender(String[] addresses) {
+ protected SagaLoadBalanceSender newMessageSender(String[] addresses) {
ClassLoader classLoader = getClass().getClassLoader();
AlphaClusterConfig clusterConfig = AlphaClusterConfig.builder()
.addresses(ImmutableList.copyOf(addresses))
@@ -60,10 +64,12 @@ protected MessageSender newMessageSender(String[]
addresses) {
.messageDeserializer(deserializer)
.build();
- return new LoadBalancedClusterMessageSender(
+ LoadBalanceContext loadContext = new LoadBalanceContextBuilder(
+ TransactionType.SAGA,
clusterConfig,
- new ServiceConfig(serviceName),
- 100);
+ new ServiceConfig(serviceName), 100).build();
+
+ return new SagaLoadBalanceSender(loadContext, new FastestSender());
}
@BeforeClass
diff --git
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 491e15f5..1ffdb3b2 100644
---
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -33,10 +33,18 @@
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.Callable;
+import org.apache.servicecomb.saga.omega.connector.grpc.core.FastestSender;
+import
org.apache.servicecomb.saga.omega.connector.grpc.core.LoadBalanceContext;
+import
org.apache.servicecomb.saga.omega.connector.grpc.core.LoadBalanceContextBuilder;
+import org.apache.servicecomb.saga.omega.connector.grpc.core.TransactionType;
+import
org.apache.servicecomb.saga.omega.connector.grpc.saga.SagaLoadBalanceSender;
import org.apache.servicecomb.saga.omega.context.ServiceConfig;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import org.apache.servicecomb.saga.omega.transaction.SagaMessageSender;
import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
import org.apache.servicecomb.saga.omega.transaction.TxEvent;
import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent;
@@ -46,7 +54,7 @@
public class LoadBalancedClusterMessageSenderTest extends
LoadBalancedClusterMessageSenderTestBase {
@Override
- protected MessageSender newMessageSender(String[] addresses) {
+ protected SagaLoadBalanceSender newMessageSender(String[] addresses) {
AlphaClusterConfig clusterConfig = AlphaClusterConfig.builder()
.addresses(ImmutableList.copyOf(addresses))
.enableSSL(false)
@@ -55,10 +63,13 @@ protected MessageSender newMessageSender(String[]
addresses) {
.messageDeserializer(deserializer)
.messageHandler(handler)
.build();
- return new LoadBalancedClusterMessageSender(
+
+ LoadBalanceContext loadContext = new LoadBalanceContextBuilder(
+ TransactionType.SAGA,
clusterConfig,
- new ServiceConfig(serviceName),
- 100);
+ new ServiceConfig(serviceName), 100).build();
+
+ return new SagaLoadBalanceSender(loadContext, new FastestSender());
}
@BeforeClass
@@ -137,13 +148,12 @@ public Boolean call() throws Exception {
});
}
- @Test(timeout = 1000)
+ @Test
public void stopSendingOnInterruption() throws Exception {
- MessageSender underlying = Mockito.mock(MessageSender.class);
+ SagaMessageSender underlying = Mockito.mock(SagaMessageSender.class);
doThrow(RuntimeException.class).when(underlying).send(event);
- final MessageSender messageSender = new
LoadBalancedClusterMessageSender(underlying);
-
+ setSenders(underlying);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
@@ -182,14 +192,14 @@ public Boolean call() throws Exception {
@Test
public void swallowException_UntilAllSendersConnected() throws Exception {
- MessageSender underlying1 = Mockito.mock(MessageSender.class);
+ SagaMessageSender underlying1 = Mockito.mock(SagaMessageSender.class);
doThrow(RuntimeException.class).when(underlying1).onConnected();
- MessageSender underlying2 = Mockito.mock(MessageSender.class);
+ SagaMessageSender underlying2 = Mockito.mock(SagaMessageSender.class);
- MessageSender sender = new LoadBalancedClusterMessageSender(underlying1,
underlying2);
+ setSenders(underlying1, underlying2);
- sender.onConnected();
+ messageSender.onConnected();
verify(underlying1).onConnected();
verify(underlying2).onConnected();
@@ -197,14 +207,14 @@ public void swallowException_UntilAllSendersConnected()
throws Exception {
@Test
public void swallowException_UntilAllSendersDisconnected() throws Exception {
- MessageSender underlying1 = Mockito.mock(MessageSender.class);
+ SagaMessageSender underlying1 = Mockito.mock(SagaMessageSender.class);
doThrow(RuntimeException.class).when(underlying1).onDisconnected();
- MessageSender underlying2 = Mockito.mock(MessageSender.class);
+ SagaMessageSender underlying2 = Mockito.mock(SagaMessageSender.class);
- MessageSender sender = new LoadBalancedClusterMessageSender(underlying1,
underlying2);
+ setSenders(underlying1, underlying2);
- sender.onDisconnected();
+ messageSender.onDisconnected();
verify(underlying1).onDisconnected();
verify(underlying2).onDisconnected();
@@ -317,4 +327,12 @@ private int killServerReceivedMessage() {
}
throw new IllegalStateException("None of the servers received any
message");
}
+
+ private void setSenders(SagaMessageSender ... underlyings) {
+ Map<MessageSender, Long> senders = new HashMap<>();
+ for (SagaMessageSender each : underlyings) {
+ senders.put(each, 0L);
+ }
+ messageSender.getLoadContext().setSenders(senders);
+ }
}
diff --git
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTestBase.java
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTestBase.java
index 9a2db233..4e96bcd6 100644
---
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTestBase.java
+++
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTestBase.java
@@ -21,7 +21,6 @@
import static org.junit.Assert.fail;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -29,11 +28,12 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.servicecomb.saga.common.EventType;
-import org.apache.servicecomb.saga.omega.context.ServiceConfig;
+import
org.apache.servicecomb.saga.omega.connector.grpc.saga.SagaLoadBalanceSender;
import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
+import org.apache.servicecomb.saga.omega.transaction.SagaMessageSender;
import org.apache.servicecomb.saga.omega.transaction.TxEvent;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
@@ -108,7 +108,7 @@ public void onReceive(String globalTxId, String localTxId,
String parentTxId, St
protected final String[] addresses = {"localhost:8080", "localhost:8090"};
- protected final MessageSender messageSender = newMessageSender(addresses);
+ protected final SagaLoadBalanceSender messageSender =
newMessageSender(addresses);
@AfterClass
public static void tearDown() throws Exception {
@@ -117,7 +117,7 @@ public static void tearDown() throws Exception {
}
}
- protected abstract MessageSender newMessageSender(String[] addresses);
+ protected abstract SagaLoadBalanceSender newMessageSender(String[]
addresses);
@After
public void after() throws Exception {
diff --git
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnableTest.java
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnableTest.java
index 4a12f1f1..aad214f9 100644
---
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnableTest.java
+++
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnableTest.java
@@ -30,8 +30,8 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import
org.apache.servicecomb.saga.omega.connector.grpc.core.PushBackReconnectRunnable;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
-import org.hamcrest.Matcher;
import org.junit.Before;
import org.junit.Test;
diff --git
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
index c1edb264..9f97eb62 100644
---
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
+++
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
@@ -27,9 +27,10 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-
+import
org.apache.servicecomb.saga.omega.connector.grpc.saga.RetryableMessageSender;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import org.apache.servicecomb.saga.omega.transaction.SagaMessageSender;
import org.apache.servicecomb.saga.omega.transaction.SagaStartedEvent;
import org.apache.servicecomb.saga.omega.transaction.TxEvent;
import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent;
@@ -38,7 +39,7 @@
public class RetryableMessageSenderTest {
@SuppressWarnings("unchecked")
private final BlockingQueue<MessageSender> availableMessageSenders = new
LinkedBlockingQueue<>();
- private final MessageSender messageSender = new
RetryableMessageSender(availableMessageSenders);
+ private final SagaMessageSender messageSender = new
RetryableMessageSender(availableMessageSenders);
private final String globalTxId = uniquify("globalTxId");
private final String localTxId = uniquify("localTxId");
@@ -47,7 +48,7 @@
@Test
public void sendEventWhenSenderIsAvailable() {
- MessageSender sender = mock(MessageSender.class);
+ SagaMessageSender sender = mock(SagaMessageSender.class);
availableMessageSenders.add(sender);
messageSender.send(event);
diff --git
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceContextBuilderTest.java
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceContextBuilderTest.java
index 37e8aed3..6be785bb 100644
---
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceContextBuilderTest.java
+++
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceContextBuilderTest.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
@@ -31,9 +31,14 @@
import java.io.IOException;
import java.util.ArrayList;
import org.apache.servicecomb.saga.omega.connector.grpc.AlphaClusterConfig;
+import
org.apache.servicecomb.saga.omega.connector.grpc.core.LoadBalanceContext;
+import
org.apache.servicecomb.saga.omega.connector.grpc.core.LoadBalanceContextBuilder;
+import org.apache.servicecomb.saga.omega.connector.grpc.core.TransactionType;
import org.apache.servicecomb.saga.omega.context.ServiceConfig;
+import org.apache.servicecomb.saga.omega.transaction.SagaMessageSender;
import
org.apache.servicecomb.saga.omega.transaction.tcc.CoordinateMessageHandler;
import org.apache.servicecomb.saga.omega.transaction.tcc.TccMessageHandler;
+import org.apache.servicecomb.saga.omega.transaction.tcc.TccMessageSender;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -50,7 +55,7 @@
private final ServiceConfig serviceConfig = new ServiceConfig(serverName);
protected final String[] addresses = {"localhost:8080", "localhost:8090"};
- private LoadBalanceContextBuilder tccLoadBalanceContextBuilder;
+ private LoadBalanceContextBuilder tccLoadBalanceContextBuilder;
private LoadBalanceContextBuilder sagaLoadBalanceContextBuilder;
@Before
@@ -107,12 +112,29 @@ public void throwExceptionWhenAddressIsNotExist() {
@Test
public void buildSagaLoadBalanceContextWithoutSsl() {
-
+ LoadBalanceContext loadContext = sagaLoadBalanceContextBuilder.build();
+ assertThat(loadContext.getPendingTaskRunner().getReconnectDelay(), is(30));
+ assertThat(loadContext.getSenders().size(), is(2));
+ assertThat(loadContext.getSenders().keySet().iterator().next(),
instanceOf(SagaMessageSender.class));
+ assertThat(loadContext.getSenders().values().iterator().next(), is(0l));
+ assertThat(loadContext.getChannels().size(), is(2));
+ loadContext.getSenders().keySet().iterator().next().close();
+ shutdownChannels(loadContext);
}
@Test
public void buildSagaLoadBalanceContextWithSsl() {
-
+ when(clusterConfig.isEnableSSL()).thenReturn(true);
+
when(clusterConfig.getCert()).thenReturn(getClass().getClassLoader().getResource("client.crt").getFile());
+
when(clusterConfig.getCertChain()).thenReturn(getClass().getClassLoader().getResource("ca.crt").getFile());
+
when(clusterConfig.getKey()).thenReturn(getClass().getClassLoader().getResource("client.pem").getFile());
+ LoadBalanceContext loadContext = sagaLoadBalanceContextBuilder.build();
+ assertThat(loadContext.getPendingTaskRunner().getReconnectDelay(), is(30));
+ assertThat(loadContext.getSenders().size(), is(2));
+ assertThat(loadContext.getSenders().keySet().iterator().next(),
instanceOf(SagaMessageSender.class));
+ assertThat(loadContext.getSenders().values().iterator().next(), is(0l));
+ assertThat(loadContext.getChannels().size(), is(2));
+ shutdownChannels(loadContext);
}
private void shutdownChannels(LoadBalanceContext loadContext) {
diff --git
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceSenderTestBase.java
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceSenderTestBase.java
index b95e9bcd..a2691a7a 100644
---
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceSenderTestBase.java
+++
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/LoadBalanceSenderTestBase.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
diff --git
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/MyTccEventServiceImpl.java
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/MyTccEventServiceImpl.java
index bebe7b28..f2e10cb4 100644
---
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/MyTccEventServiceImpl.java
+++
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/MyTccEventServiceImpl.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
diff --git
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/SagaLoadBalanceSenderTest.java
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/SagaLoadBalanceSenderTest.java
index cd8eab7e..3adeee7d 100644
---
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/SagaLoadBalanceSenderTest.java
+++
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/SagaLoadBalanceSenderTest.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
diff --git
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TccLoadBalanceSenderTest.java
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TccLoadBalanceSenderTest.java
index afec0439..ea7cebf5 100644
---
a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TccLoadBalanceSenderTest.java
+++
b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TccLoadBalanceSenderTest.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
@@ -35,7 +35,6 @@
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.netty.NettyServerBuilder;
-import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Map;
@@ -44,13 +43,17 @@
import java.util.concurrent.TimeUnit;
import org.apache.servicecomb.saga.common.TransactionStatus;
import org.apache.servicecomb.saga.omega.connector.grpc.AlphaClusterConfig;
-import org.apache.servicecomb.saga.omega.connector.grpc.FastestSender;
+import org.apache.servicecomb.saga.omega.connector.grpc.core.FastestSender;
+import
org.apache.servicecomb.saga.omega.connector.grpc.core.LoadBalanceContext;
+import
org.apache.servicecomb.saga.omega.connector.grpc.core.LoadBalanceContextBuilder;
+import org.apache.servicecomb.saga.omega.connector.grpc.core.TransactionType;
import org.apache.servicecomb.saga.omega.context.ServiceConfig;
import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.apache.servicecomb.saga.omega.transaction.OmegaException;
import
org.apache.servicecomb.saga.omega.transaction.tcc.CoordinateMessageHandler;
import org.apache.servicecomb.saga.omega.transaction.tcc.TccMessageHandler;
+import org.apache.servicecomb.saga.omega.transaction.tcc.TccMessageSender;
import
org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
import
org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
@@ -58,6 +61,7 @@
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent;
import org.hamcrest.core.Is;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -88,12 +92,19 @@
private CoordinatedEvent coordinatedEvent;
@BeforeClass
- public static void startServer() throws IOException {
+ public static void startServer() {
for (Integer each : ports) {
startServerOnPort(each);
}
}
+ @AfterClass
+ public static void shutdownServer() {
+ for(Server server: servers.values()) {
+ server.shutdown();
+ }
+ }
+
private static void startServerOnPort(int port) {
ServerBuilder<?> serverBuilder = NettyServerBuilder.forAddress(
new InetSocketAddress("127.0.0.1", port));
diff --git
a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 614038b6..b1fe7e95 100644
---
a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++
b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -18,16 +18,13 @@
package org.apache.servicecomb.saga.omega.spring;
import com.google.common.collect.ImmutableList;
-import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
import org.apache.servicecomb.saga.omega.connector.grpc.AlphaClusterConfig;
-import org.apache.servicecomb.saga.omega.connector.grpc.FastestSender;
-import org.apache.servicecomb.saga.omega.connector.grpc.GrpcTccEventService;
-import
org.apache.servicecomb.saga.omega.connector.grpc.LoadBalancedClusterMessageSender;
-import org.apache.servicecomb.saga.omega.connector.grpc.tcc.LoadBalanceContext;
-import
org.apache.servicecomb.saga.omega.connector.grpc.tcc.LoadBalanceContextBuilder;
+import org.apache.servicecomb.saga.omega.connector.grpc.core.FastestSender;
+import
org.apache.servicecomb.saga.omega.connector.grpc.core.LoadBalanceContext;
+import
org.apache.servicecomb.saga.omega.connector.grpc.core.LoadBalanceContextBuilder;
+import org.apache.servicecomb.saga.omega.connector.grpc.core.TransactionType;
+import
org.apache.servicecomb.saga.omega.connector.grpc.saga.SagaLoadBalanceSender;
import
org.apache.servicecomb.saga.omega.connector.grpc.tcc.TccLoadBalanceSender;
-import org.apache.servicecomb.saga.omega.connector.grpc.tcc.TransactionType;
import org.apache.servicecomb.saga.omega.context.CallbackContext;
import org.apache.servicecomb.saga.omega.context.IdGenerator;
import org.apache.servicecomb.saga.omega.context.OmegaContext;
@@ -36,11 +33,11 @@
import org.apache.servicecomb.saga.omega.format.KryoMessageFormat;
import org.apache.servicecomb.saga.omega.format.MessageFormat;
import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
-import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.SagaMessageSender;
import
org.apache.servicecomb.saga.omega.transaction.tcc.DefaultParametersContext;
import org.apache.servicecomb.saga.omega.transaction.tcc.ParametersContext;
-import org.apache.servicecomb.saga.omega.transaction.tcc.TccEventService;
import org.apache.servicecomb.saga.omega.transaction.tcc.TccMessageHandler;
+import org.apache.servicecomb.saga.omega.transaction.tcc.TccMessageSender;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
@@ -107,30 +104,34 @@ AlphaClusterConfig alphaClusterConfig(
return clusterConfig;
}
- @Bean(name = "sagaSender")
- MessageSender grpcMessageSender(
+ @Bean(name = "sagaLoadContext")
+ LoadBalanceContext sagaLoadBalanceSenderContext(
AlphaClusterConfig alphaClusterConfig,
ServiceConfig serviceConfig,
@Value("${omega.connection.reconnectDelay:3000}") int reconnectDelay) {
-
- final MessageSender sender = new LoadBalancedClusterMessageSender(
+ LoadBalanceContext loadBalanceSenderContext = new
LoadBalanceContextBuilder(
+ TransactionType.SAGA,
alphaClusterConfig,
serviceConfig,
- reconnectDelay);
+ reconnectDelay).build();
+ return loadBalanceSenderContext;
+ }
- sender.onConnected();
-
+ @Bean
+ SagaMessageSender sagaLoadBalanceSender(@Qualifier("sagaLoadContext")
LoadBalanceContext loadBalanceSenderContext) {
+ final SagaMessageSender sagaMessageSender = new
SagaLoadBalanceSender(loadBalanceSenderContext, new FastestSender());
+ sagaMessageSender.onConnected();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
- sender.onDisconnected();
- sender.close();
+ sagaMessageSender.onDisconnected();
+ sagaMessageSender.close();
}
}));
- return sender;
+ return sagaMessageSender;
}
- @Bean
+ @Bean(name = "tccLoadContext")
LoadBalanceContext loadBalanceSenderContext(
AlphaClusterConfig alphaClusterConfig,
ServiceConfig serviceConfig,
@@ -143,28 +144,17 @@ LoadBalanceContext loadBalanceSenderContext(
return loadBalanceSenderContext;
}
- @Bean(name = "tccSender")
- TccLoadBalanceSender tccLoadBalanceSender(LoadBalanceContext
loadBalanceSenderContext) {
- return new TccLoadBalanceSender(loadBalanceSenderContext, new
FastestSender());
- }
-
- // TODO should integrate with loadBalance message sender in future.
@Bean
- TccEventService tccEventService(ServiceConfig serviceConfig,
- @Lazy TccMessageHandler coordinateMessageHandler,
- @Value("${alpha.cluster.address:localhost:8080}") String address) {
- ManagedChannel channel =
ManagedChannelBuilder.forTarget(address).usePlaintext().build();
- final GrpcTccEventService service = new GrpcTccEventService(serviceConfig,
channel, address, coordinateMessageHandler);
- // Need to register it self first
- service.onConnected();
-
+ TccMessageSender tccLoadBalanceSender(@Qualifier("tccLoadContext")
LoadBalanceContext loadBalanceSenderContext) {
+ final TccMessageSender tccMessageSender = new
TccLoadBalanceSender(loadBalanceSenderContext, new FastestSender());
+ tccMessageSender.onConnected();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
- service.onDisconnected();
- service.close();
+ tccMessageSender.onDisconnected();
+ tccMessageSender.close();
}
}));
- return service;
+ return tccMessageSender;
}
}
diff --git
a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
index 5977b474..e5ab9684 100644
---
a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
+++
b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
@@ -21,13 +21,13 @@
import org.apache.servicecomb.saga.omega.context.OmegaContext;
import
org.apache.servicecomb.saga.omega.transaction.CompensationMessageHandler;
import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
-import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.SagaMessageSender;
import org.apache.servicecomb.saga.omega.transaction.SagaStartAspect;
import org.apache.servicecomb.saga.omega.transaction.TransactionAspect;
import
org.apache.servicecomb.saga.omega.transaction.tcc.CoordinateMessageHandler;
import org.apache.servicecomb.saga.omega.transaction.tcc.ParametersContext;
-import org.apache.servicecomb.saga.omega.transaction.tcc.TccEventService;
import org.apache.servicecomb.saga.omega.transaction.tcc.TccMessageHandler;
+import org.apache.servicecomb.saga.omega.transaction.tcc.TccMessageSender;
import org.apache.servicecomb.saga.omega.transaction.tcc.TccParticipatorAspect;
import org.apache.servicecomb.saga.omega.transaction.tcc.TccStartAspect;
import org.springframework.beans.factory.annotation.Qualifier;
@@ -40,18 +40,18 @@
public class TransactionAspectConfig {
@Bean
- MessageHandler messageHandler(@Qualifier("sagaSender") MessageSender sender,
+ MessageHandler messageHandler(SagaMessageSender sender,
@Qualifier("compensationContext") CallbackContext context, OmegaContext
omegaContext) {
return new CompensationMessageHandler(sender, context);
}
@Bean
- SagaStartAspect sagaStartAspect(@Qualifier("sagaSender")MessageSender
sender, OmegaContext context) {
+ SagaStartAspect sagaStartAspect(SagaMessageSender sender, OmegaContext
context) {
return new SagaStartAspect(sender, context);
}
@Bean
- TransactionAspect transactionAspect(@Qualifier("sagaSender")MessageSender
sender, OmegaContext context) {
+ TransactionAspect transactionAspect(SagaMessageSender sender, OmegaContext
context) {
return new TransactionAspect(sender, context);
}
@@ -63,21 +63,25 @@ CompensableAnnotationProcessor
compensableAnnotationProcessor(OmegaContext omega
@Bean
TccMessageHandler coordinateMessageHandler(
- TccEventService tccEventService,
+ TccMessageSender tccMessageSender,
@Qualifier("coordinateContext") CallbackContext coordinateContext,
OmegaContext omegaContext,
ParametersContext parametersContext) {
- return new CoordinateMessageHandler(tccEventService, coordinateContext,
omegaContext, parametersContext);
+ return new CoordinateMessageHandler(tccMessageSender, coordinateContext,
omegaContext, parametersContext);
}
@Bean
- TccStartAspect tccStartAspect(TccEventService tccEventService, OmegaContext
context) {
- return new TccStartAspect(tccEventService, context);
+ TccStartAspect tccStartAspect(
+ TccMessageSender tccMessageSender,
+ OmegaContext context) {
+ return new TccStartAspect(tccMessageSender, context);
}
@Bean
- TccParticipatorAspect tccParticipatorAspect(TccEventService tccEventService,
OmegaContext context, ParametersContext parametersContext) {
- return new TccParticipatorAspect(tccEventService, context,
parametersContext);
+ TccParticipatorAspect tccParticipatorAspect(
+ TccMessageSender tccMessageSender,
+ OmegaContext context, ParametersContext parametersContext) {
+ return new TccParticipatorAspect(tccMessageSender, context,
parametersContext);
}
@Bean
diff --git
a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MessageConfig.java
b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MessageConfig.java
index 6230c592..d99bc1b3 100644
---
a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MessageConfig.java
+++
b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MessageConfig.java
@@ -24,10 +24,12 @@
import org.apache.servicecomb.saga.omega.context.OmegaContext;
import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.SagaMessageSender;
import org.apache.servicecomb.saga.omega.transaction.TxEvent;
import
org.apache.servicecomb.saga.omega.transaction.tcc.DefaultParametersContext;
import org.apache.servicecomb.saga.omega.transaction.tcc.ParametersContext;
import org.apache.servicecomb.saga.omega.transaction.tcc.TccEventService;
+import org.apache.servicecomb.saga.omega.transaction.tcc.TccMessageSender;
import
org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
import
org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
@@ -71,8 +73,8 @@ OmegaContext omegaContext(IdGenerator<String> idGenerator) {
}
@Bean(name = "sagaSender")
- MessageSender sender() {
- return new MessageSender() {
+ SagaMessageSender sender() {
+ return new SagaMessageSender() {
@Override
public void onConnected() {
@@ -102,8 +104,8 @@ public AlphaResponse send(TxEvent event) {
}
@Bean
- TccEventService tccEventService() {
- return new TccEventService() {
+ TccMessageSender TccMessageSender() {
+ return new TccMessageSender() {
@Override
public void onConnected() {
}
diff --git
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
index 588d6604..5c5bb183 100644
---
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
+++
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
@@ -21,9 +21,9 @@
class CompensableInterceptor implements EventAwareInterceptor {
private final OmegaContext context;
- private final MessageSender sender;
+ private final SagaMessageSender sender;
- CompensableInterceptor(OmegaContext context, MessageSender sender) {
+ CompensableInterceptor(OmegaContext context, SagaMessageSender sender) {
this.sender = sender;
this.context = context;
}
diff --git
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
index 75501b9d..2e1e0a3f 100644
---
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
+++
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
@@ -20,11 +20,11 @@
import org.apache.servicecomb.saga.omega.context.CallbackContext;
public class CompensationMessageHandler implements MessageHandler {
- private final MessageSender sender;
+ private final SagaMessageSender sender;
private final CallbackContext context;
- public CompensationMessageHandler(MessageSender sender, CallbackContext
context) {
+ public CompensationMessageHandler(SagaMessageSender sender, CallbackContext
context) {
this.sender = sender;
this.context = context;
}
diff --git
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
index 019d8dd4..40620823 100644
---
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
+++
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
@@ -6,18 +6,19 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.servicecomb.saga.omega.transaction;
public interface MessageSender {
+
void onConnected();
void onDisconnected();
@@ -26,5 +27,4 @@
String target();
- AlphaResponse send(TxEvent event);
}
diff --git
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TransactionType.java
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaMessageSender.java
similarity index 84%
rename from
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TransactionType.java
rename to
omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaMessageSender.java
index 06108dd0..3a6c67cf 100644
---
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TransactionType.java
+++
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaMessageSender.java
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+package org.apache.servicecomb.saga.omega.transaction;
-public enum TransactionType {
- TCC, SAGA
+public interface SagaMessageSender extends MessageSender {
+
+ AlphaResponse send(TxEvent event);
}
diff --git
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index b0b2de4b..4c3b1228 100644
---
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -23,9 +23,9 @@
class SagaStartAnnotationProcessor {
private final OmegaContext omegaContext;
- private final MessageSender sender;
+ private final SagaMessageSender sender;
- SagaStartAnnotationProcessor(OmegaContext omegaContext, MessageSender
sender) {
+ SagaStartAnnotationProcessor(OmegaContext omegaContext, SagaMessageSender
sender) {
this.omegaContext = omegaContext;
this.sender = sender;
}
diff --git
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
index 0dc3279e..0e09f496 100644
---
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
+++
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
@@ -36,7 +36,7 @@
private final OmegaContext context;
- public SagaStartAspect(MessageSender sender, OmegaContext context) {
+ public SagaStartAspect(SagaMessageSender sender, OmegaContext context) {
this.context = context;
this.sagaStartAnnotationProcessor = new
SagaStartAnnotationProcessor(context, sender);
}
diff --git
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index f7a98ee7..ee247ceb 100644
---
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -37,7 +37,7 @@
private final CompensableInterceptor interceptor;
- public TransactionAspect(MessageSender sender, OmegaContext context) {
+ public TransactionAspect(SagaMessageSender sender, OmegaContext context) {
this.context = context;
this.interceptor = new CompensableInterceptor(context, sender);
}
diff --git
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandler.java
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandler.java
index 17b6df81..657c9fbb 100644
---
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandler.java
+++
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandler.java
@@ -24,7 +24,7 @@
public class CoordinateMessageHandler implements TccMessageHandler {
- private final TccEventService tccEventService;
+ private final TccMessageSender tccMessageSender;
private final CallbackContext callbackContext;
@@ -32,10 +32,10 @@
private final ParametersContext parametersContext;
- public CoordinateMessageHandler(TccEventService tccEventService,
+ public CoordinateMessageHandler(TccMessageSender tccMessageSender,
CallbackContext callbackContext, OmegaContext omegaContext,
ParametersContext parametersContext) {
- this.tccEventService = tccEventService;
+ this.tccMessageSender = tccMessageSender;
this.callbackContext = callbackContext;
this.omegaContext = omegaContext;
this.parametersContext = parametersContext;
@@ -46,7 +46,7 @@ public void onReceive(String globalTxId, String localTxId,
String parentTxId, St
// TODO need to catch the exception and send the failed message
// The parameter need to be updated here
callbackContext.apply(globalTxId, localTxId, methodName,
parametersContext.getParameters(localTxId));
- tccEventService.coordinate(new CoordinatedEvent(globalTxId, localTxId,
parentTxId, methodName, TransactionStatus.Succeed));
+ tccMessageSender.coordinate(new CoordinatedEvent(globalTxId, localTxId,
parentTxId, methodName, TransactionStatus.Succeed));
// Need to remove the parameter
parametersContext.removeParameter(localTxId);
}
diff --git
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TccMessageSender.java
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccMessageSender.java
similarity index 96%
rename from
omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TccMessageSender.java
rename to
omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccMessageSender.java
index 00cad149..2a9055eb 100644
---
a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/tcc/TccMessageSender.java
+++
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccMessageSender.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.saga.omega.connector.grpc.tcc;
+package org.apache.servicecomb.saga.omega.transaction.tcc;
import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
diff --git
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspect.java
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspect.java
index e3a8fccc..88496605 100644
---
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspect.java
+++
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspect.java
@@ -35,15 +35,15 @@
private static final Logger LOG =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final OmegaContext context;
- private final TccEventService tccEventService;
+ private final TccMessageSender tccMessageSender;
private final ParametersContext parametersContext;
//We need to inject the CoordinateMessageHandler for the parameter map
- public TccParticipatorAspect(TccEventService tccEventService, OmegaContext
context,
+ public TccParticipatorAspect(TccMessageSender tccMessageSender, OmegaContext
context,
ParametersContext parametersContext) {
this.context = context;
- this.tccEventService = tccEventService;
+ this.tccMessageSender = tccMessageSender;
this.parametersContext = parametersContext;
}
@@ -60,7 +60,7 @@ Object advise(ProceedingJoinPoint joinPoint, Participate
participate) throws Thr
try {
Object result = joinPoint.proceed();
// Send the participate message back
- tccEventService.participate(new ParticipatedEvent(context.globalTxId(),
context.localTxId(), localTxId, confirmMethod,
+ tccMessageSender.participate(new ParticipatedEvent(context.globalTxId(),
context.localTxId(), localTxId, confirmMethod,
cancelMethod, TransactionStatus.Succeed));
// Just store the parameters into the context
parametersContext.putParamters(context.localTxId(), joinPoint.getArgs());
@@ -68,7 +68,7 @@ Object advise(ProceedingJoinPoint joinPoint, Participate
participate) throws Thr
return result;
} catch (Throwable throwable) {
// Now we don't handle the error message
- tccEventService.participate(new ParticipatedEvent(context.globalTxId(),
context.localTxId(), localTxId, confirmMethod,
+ tccMessageSender.participate(new ParticipatedEvent(context.globalTxId(),
context.localTxId(), localTxId, confirmMethod,
cancelMethod, TransactionStatus.Failed));
LOG.error("Participate Transaction with context {} failed.", context,
throwable);
throw throwable;
diff --git
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java
index 137a3972..1d4cf24d 100644
---
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java
+++
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java
@@ -17,7 +17,6 @@
package org.apache.servicecomb.saga.omega.transaction.tcc;
import javax.transaction.TransactionalException;
-
import org.apache.servicecomb.saga.common.TransactionStatus;
import org.apache.servicecomb.saga.omega.context.OmegaContext;
import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
@@ -28,30 +27,30 @@
public class TccStartAnnotationProcessor {
private final OmegaContext omegaContext;
- private final TccEventService eventService;
+ private final TccMessageSender tccMessageSender;
- TccStartAnnotationProcessor(OmegaContext omegaContext, TccEventService
eventService) {
+ TccStartAnnotationProcessor(OmegaContext omegaContext, TccMessageSender
tccMessageSender) {
this.omegaContext = omegaContext;
- this.eventService = eventService;
+ this.tccMessageSender = tccMessageSender;
}
public AlphaResponse preIntercept(String parentTxId, String methodName, int
timeout) {
try {
- return eventService.tccTransactionStart(new
TccStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
+ return tccMessageSender.tccTransactionStart(new
TccStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
} catch (OmegaException e) {
throw new TransactionalException(e.getMessage(), e.getCause());
}
}
public void postIntercept(String parentTxId, String methodName) {
- eventService.tccTransactionStop(new
TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
+ tccMessageSender.tccTransactionStop(new
TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
TransactionStatus.Succeed));
}
public void onError(String parentTxId, String methodName, Throwable
throwable) {
// Send the cancel event
// Do we need to wait for the alpha finish all the transaction
- eventService.tccTransactionStop(new
TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
+ tccMessageSender.tccTransactionStop(new
TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
TransactionStatus.Failed));
}
}
diff --git
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java
index 62a2873c..8d0bf839 100644
---
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java
+++
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java
@@ -37,9 +37,9 @@
private final OmegaContext context;
- public TccStartAspect(TccEventService tccEventServicer, OmegaContext
context) {
+ public TccStartAspect(TccMessageSender tccMessageSender, OmegaContext
context) {
this.context = context;
- this.tccStartAnnotationProcessor = new
TccStartAnnotationProcessor(context, tccEventServicer);
+ this.tccStartAnnotationProcessor = new
TccStartAnnotationProcessor(context, tccMessageSender);
}
@Around("execution(@org.apache.servicecomb.saga.omega.context.annotations.TccStart
* *(..)) && @annotation(tccStart)")
diff --git
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
index aefc9a45..aec9d5a7 100644
---
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
+++
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
@@ -42,7 +42,7 @@
private final String localTxId = UUID.randomUUID().toString();
private final String parentTxId = UUID.randomUUID().toString();
- private final MessageSender sender =new MessageSender() {
+ private final SagaMessageSender sender = new SagaMessageSender() {
@Override
public void onConnected() {
}
diff --git
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
index b9edb073..ce40d711 100644
---
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
+++
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
@@ -34,7 +34,7 @@
public class CompensationMessageHandlerTest {
private final List<TxEvent> events = new ArrayList<>();
- private final MessageSender sender = new MessageSender() {
+ private final SagaMessageSender sender = new SagaMessageSender() {
@Override
public void onConnected() {
diff --git
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecoveryTest.java
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecoveryTest.java
index edbabb9e..3f8b17fd 100644
---
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecoveryTest.java
+++
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecoveryTest.java
@@ -67,7 +67,7 @@
private final Compensable compensable = mock(Compensable.class);
- private final MessageSender sender = new MessageSender() {
+ private final SagaMessageSender sender = new SagaMessageSender() {
@Override
public void onConnected() {
@@ -165,7 +165,7 @@ public void recordAbortedEventWhenFailed() throws Throwable
{
@Test
public void returnImmediatelyWhenReceivedRejectResponse() {
- MessageSender sender = mock(MessageSender.class);
+ SagaMessageSender sender = mock(SagaMessageSender.class);
when(sender.send(any(TxEvent.class))).thenReturn(new AlphaResponse(true));
CompensableInterceptor interceptor = new
CompensableInterceptor(omegaContext, sender);
diff --git
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecoveryTest.java
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecoveryTest.java
index 9ba4b474..98b3980e 100644
---
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecoveryTest.java
+++
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecoveryTest.java
@@ -66,7 +66,7 @@
private final Compensable compensable = mock(Compensable.class);
- private final MessageSender sender = new MessageSender() {
+ private final SagaMessageSender sender = new SagaMessageSender() {
@Override
public void onConnected() {
@@ -116,7 +116,7 @@ public void setUp() throws Exception {
@Test
public void forwardExceptionWhenGlobalTxAborted() {
- MessageSender sender = mock(MessageSender.class);
+ SagaMessageSender sender = mock(SagaMessageSender.class);
when(sender.send(any(TxEvent.class))).thenReturn(new AlphaResponse(true));
CompensableInterceptor interceptor = new
CompensableInterceptor(omegaContext, sender);
diff --git
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
index 6d50e6f8..9052e2f1 100644
---
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
+++
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
@@ -40,7 +40,7 @@
private final List<TxEvent> messages = new ArrayList<>();
- private final MessageSender sender = new MessageSender() {
+ private final SagaMessageSender sender = new SagaMessageSender() {
@Override
public void onConnected() {
@@ -116,7 +116,7 @@ public void sendsSagaEndedEvent() {
@Test
public void transformInterceptedException() {
- MessageSender sender = mock(MessageSender.class);
+ SagaMessageSender sender = mock(SagaMessageSender.class);
SagaStartAnnotationProcessor sagaStartAnnotationProcessor = new
SagaStartAnnotationProcessor(
context, sender);
diff --git
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
index 4025c1c5..a9ff3388 100644
---
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
+++
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
@@ -35,14 +35,13 @@
import org.aspectj.lang.reflect.MethodSignature;
import org.junit.Before;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
public class SagaStartAspectTest {
private final List<TxEvent> messages = new ArrayList<>();
private final String globalTxId = UUID.randomUUID().toString();
- private final MessageSender sender = new MessageSender() {
+ private final SagaMessageSender sender = new SagaMessageSender() {
@Override
public void onConnected() {
}
diff --git
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index d7fade6b..3c051409 100644
---
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -42,7 +42,7 @@
private final String localTxId = UUID.randomUUID().toString();
private final String newLocalTxId = UUID.randomUUID().toString();
- private final MessageSender sender = new MessageSender() {
+ private final SagaMessageSender sender = new SagaMessageSender() {
@Override
public void onConnected() {
}
diff --git
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandlerTest.java
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandlerTest.java
index e2e07691..6dbb7211 100644
---
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandlerTest.java
+++
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandlerTest.java
@@ -24,7 +24,6 @@
import java.util.ArrayList;
import java.util.List;
-
import org.apache.servicecomb.saga.common.TransactionStatus;
import org.apache.servicecomb.saga.omega.context.CallbackContext;
import org.apache.servicecomb.saga.omega.context.OmegaContext;
@@ -35,13 +34,12 @@
import
org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mock;
import org.mockito.Mockito;
public class CoordinateMessageHandlerTest {
private final List<CoordinatedEvent> coordinatedEvents = new ArrayList<>();
private final AlphaResponse response = new AlphaResponse(false);
- private final TccEventService eventService = new TccEventService() {
+ private final TccMessageSender tccMessageSender = new TccMessageSender() {
@Override
public void onConnected() {
@@ -92,7 +90,7 @@ public AlphaResponse coordinate(CoordinatedEvent
coordinatedEvent) {
private final CallbackContext callbackContext =
Mockito.mock(CallbackContext.class);
private final OmegaContext omegaContext = Mockito.mock(OmegaContext.class);
private final ParametersContext parametersContext =
Mockito.mock(ParametersContext.class);
- private final CoordinateMessageHandler handler = new
CoordinateMessageHandler(eventService, callbackContext, omegaContext,
parametersContext);
+ private final CoordinateMessageHandler handler = new
CoordinateMessageHandler(tccMessageSender, callbackContext, omegaContext,
parametersContext);
@Before
public void setUp() {
diff --git
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspectTest.java
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspectTest.java
index d308876d..f52da038 100644
---
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspectTest.java
+++
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspectTest.java
@@ -52,7 +52,7 @@
private final AlphaResponse response = new AlphaResponse(false);
private String confirmMethod;
private String cancelMethod;
- private final TccEventService eventService = new TccEventService() {
+ private final TccMessageSender tccMessageSender = new TccMessageSender() {
@Override
public void onConnected() {
@@ -103,7 +103,7 @@ public AlphaResponse coordinate(CoordinatedEvent
coordinatedEvent) {
private final Participate participate = mock(Participate.class);
private final ParametersContext parametersContext = new
DefaultParametersContext();
- private final TccParticipatorAspect aspect = new
TccParticipatorAspect(eventService, omegaContext, parametersContext);
+ private final TccParticipatorAspect aspect = new
TccParticipatorAspect(tccMessageSender, omegaContext, parametersContext);
@Before
diff --git
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessorTest.java
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessorTest.java
index fa34ad7d..18538741 100644
---
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessorTest.java
+++
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessorTest.java
@@ -53,7 +53,7 @@
private final IdGenerator<String> generator = mock(IdGenerator.class);
private final OmegaContext context = new OmegaContext(generator);
private final OmegaException exception = new OmegaException("exception", new
RuntimeException("runtime exception"));
- private final TccEventService eventService = new TccEventService() {
+ private final TccMessageSender tccMessageSender = new TccMessageSender() {
@Override
public void onConnected() {
@@ -100,7 +100,7 @@ public AlphaResponse coordinate(CoordinatedEvent
coordinatedEvent) {
}
};
private final TccStartAnnotationProcessor tccStartAnnotationProcessor = new
TccStartAnnotationProcessor(context,
- eventService);
+ tccMessageSender);
@Before
public void setUp() throws Exception {
diff --git
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspectTest.java
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspectTest.java
index 80f14e46..0fe096d3 100644
---
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspectTest.java
+++
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspectTest.java
@@ -49,7 +49,7 @@
private final List<TccStartedEvent> startedEvents = new ArrayList<>();
private final List<TccEndedEvent> endedEvents = new ArrayList<>();
private final AlphaResponse response = new AlphaResponse(false);
- private final TccEventService eventService = new TccEventService() {
+ private final TccMessageSender tccMessageSender = new TccMessageSender() {
@Override
public void onConnected() {
@@ -99,7 +99,7 @@ public AlphaResponse coordinate(CoordinatedEvent
coordinatedEvent) {
private final TccStart tccStart = Mockito.mock(TccStart.class);
private final OmegaContext omegaContext = new OmegaContext(idGenerator);
- private final TccStartAspect aspect = new TccStartAspect(eventService,
omegaContext);
+ private final TccStartAspect aspect = new TccStartAspect(tccMessageSender,
omegaContext);
@Before
public void setUp() throws Exception {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add fault tolerance for service comb TCC
> ----------------------------------------
>
> Key: SCB-909
> URL: https://issues.apache.org/jira/browse/SCB-909
> Project: Apache ServiceComb
> Issue Type: New Feature
> Components: Saga
> Affects Versions: saga-0.3.0
> Reporter: cherrylzhao
> Assignee: cherrylzhao
> Priority: Major
> Fix For: saga-0.3.0
>
>
> TCC fault tolerance incude following point.
> # omega can switch to another available alpha when sending message failed.
> # if omega resend logic (different alpha) failed, omega can rollback local
> data automatically.
> # alpha do resend logic (same omega) when ACK failed, if resend failed, dirty
> data can left in database, this will be handled by scanner.
> # design transaction timeout mechanics, if scanner found event won't
> completed within expected time, sending componsate command to omega, it has
> different type compare with normal.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)