This is an automated email from the ASF dual-hosted git repository. seanyinx pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit b54587d3b578e172a77727e2a041e0732364252b Author: Eric Lee <[email protected]> AuthorDate: Tue Jan 16 16:51:48 2018 +0800 SCB-234 rethrow exception in saga start annotation processor Signed-off-by: Eric Lee <[email protected]> --- .../grpc/LoadBalancedClusterMessageSender.java | 13 +++-------- .../grpc/LoadBalancedClusterMessageSenderTest.java | 8 +------ .../connector/grpc/RetryableMessageSenderTest.java | 8 +++---- .../saga/omega/spring/OmegaSpringConfig.java | 10 +------- omega/omega-transaction/pom.xml | 4 ++++ .../transaction/SagaStartAnnotationProcessor.java | 8 ++++++- .../SagaStartAnnotationProcessorTest.java | 27 +++++++++++++++++++--- pom.xml | 6 +++++ 8 files changed, 50 insertions(+), 34 deletions(-) diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java index b518524..9a78a62 100644 --- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java +++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java @@ -53,8 +53,8 @@ public class LoadBalancedClusterMessageSender implements MessageSender { private final Collection<ManagedChannel> channels; private final BlockingQueue<Runnable> pendingTasks = new LinkedBlockingQueue<>(); - private final BlockingQueue<MessageSender> availableMessageSenders; - private final MessageSender retryableMessageSender; + private final BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<>(); + private final MessageSender retryableMessageSender = new RetryableMessageSender(availableMessageSenders); private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); public LoadBalancedClusterMessageSender(String[] addresses, @@ -62,17 +62,12 @@ public class LoadBalancedClusterMessageSender implements MessageSender { MessageDeserializer deserializer, ServiceConfig serviceConfig, MessageHandler handler, - int reconnectDelay, - BlockingQueue<MessageSender> availableMessageSenders, - MessageSender retryableMessageSender) { + int reconnectDelay) { if (addresses.length == 0) { throw new IllegalArgumentException("No reachable cluster address provided"); } - this.availableMessageSenders = availableMessageSenders; - this.retryableMessageSender = retryableMessageSender; - channels = new ArrayList<>(addresses.length); for (String address : addresses) { ManagedChannel channel = ManagedChannelBuilder.forTarget(address) @@ -101,8 +96,6 @@ public class LoadBalancedClusterMessageSender implements MessageSender { senders.put(sender, 0L); } channels = emptyList(); - availableMessageSenders = new LinkedBlockingQueue<>(); - retryableMessageSender = new RetryableMessageSender(availableMessageSenders); } @Override diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java index 315c5ae..8062ae9 100644 --- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java +++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java @@ -37,9 +37,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.LinkedBlockingQueue; import org.apache.servicecomb.saga.common.EventType; import org.apache.servicecomb.saga.omega.context.ServiceConfig; @@ -103,8 +101,6 @@ public class LoadBalancedClusterMessageSenderTest { private final String serviceName = uniquify("serviceName"); private final String[] addresses = {"localhost:8080", "localhost:8090"}; - private final BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<>(); - private final MessageSender retryableMessageSender = new RetryableMessageSender(availableMessageSenders); private final MessageSender messageSender = newMessageSender(addresses); private MessageSender newMessageSender(String[] addresses) { @@ -114,9 +110,7 @@ public class LoadBalancedClusterMessageSenderTest { deserializer, new ServiceConfig(serviceName), handler, - 100, - availableMessageSenders, - retryableMessageSender); + 100); } @BeforeClass diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java index 7ffbf9a..562c50f 100644 --- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java +++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java @@ -24,9 +24,9 @@ import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.servicecomb.saga.omega.transaction.MessageSender; import org.apache.servicecomb.saga.omega.transaction.OmegaException; @@ -37,7 +37,7 @@ import org.junit.Test; public class RetryableMessageSenderTest { @SuppressWarnings("unchecked") - private final BlockingQueue<MessageSender> availableMessageSenders = mock(BlockingQueue.class); + private final BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<>(); private final MessageSender messageSender = new RetryableMessageSender(availableMessageSenders); private final String globalTxId = uniquify("globalTxId"); @@ -45,9 +45,9 @@ public class RetryableMessageSenderTest { private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x"); @Test - public void sendEventWhenSenderIsAvailable() throws InterruptedException { + public void sendEventWhenSenderIsAvailable() { MessageSender sender = mock(MessageSender.class); - when(availableMessageSenders.take()).thenReturn(sender); + availableMessageSenders.add(sender); messageSender.send(event); diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java index 78321a4..fa4027b 100644 --- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java +++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java @@ -17,11 +17,7 @@ package org.apache.servicecomb.saga.omega.spring; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - import org.apache.servicecomb.saga.omega.connector.grpc.LoadBalancedClusterMessageSender; -import org.apache.servicecomb.saga.omega.connector.grpc.RetryableMessageSender; import org.apache.servicecomb.saga.omega.context.CompensationContext; import org.apache.servicecomb.saga.omega.context.IdGenerator; import org.apache.servicecomb.saga.omega.context.OmegaContext; @@ -67,17 +63,13 @@ class OmegaSpringConfig { @Lazy MessageHandler handler) { MessageFormat messageFormat = new KryoMessageFormat(); - BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<>(); - MessageSender retryableMessageSender = new RetryableMessageSender(availableMessageSenders); MessageSender sender = new LoadBalancedClusterMessageSender( addresses, messageFormat, messageFormat, serviceConfig, handler, - reconnectDelay, - availableMessageSenders, - retryableMessageSender); + reconnectDelay); sender.onConnected(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { diff --git a/omega/omega-transaction/pom.xml b/omega/omega-transaction/pom.xml index a2bf293..258770c 100644 --- a/omega/omega-transaction/pom.xml +++ b/omega/omega-transaction/pom.xml @@ -46,6 +46,10 @@ <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> + <dependency> + <groupId>javax.transaction</groupId> + <artifactId>javax.transaction-api</artifactId> + </dependency> <dependency> <groupId>junit</groupId> diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java index 7299b25..7ef021a 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java @@ -17,6 +17,8 @@ package org.apache.servicecomb.saga.omega.transaction; +import javax.transaction.TransactionalException; + import org.apache.servicecomb.saga.omega.context.OmegaContext; class SagaStartAnnotationProcessor implements EventAwareInterceptor { @@ -31,7 +33,11 @@ class SagaStartAnnotationProcessor implements EventAwareInterceptor { @Override public AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message) { - return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId())); + try { + return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId())); + } catch (OmegaException e) { + throw new TransactionalException(e.getMessage(), e.getCause()); + } } @Override diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java index f8e936d..566a456 100644 --- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java +++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java @@ -17,15 +17,21 @@ package org.apache.servicecomb.saga.omega.transaction; +import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.core.Is.is; import static org.hamcrest.core.IsNull.nullValue; import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import javax.transaction.TransactionalException; + import org.apache.servicecomb.saga.common.EventType; import org.apache.servicecomb.saga.omega.context.IdGenerator; import org.apache.servicecomb.saga.omega.context.OmegaContext; @@ -43,12 +49,10 @@ public class SagaStartAnnotationProcessorTest { private final String globalTxId = UUID.randomUUID().toString(); - private final String localTxId = UUID.randomUUID().toString(); - @SuppressWarnings("unchecked") private final IdGenerator<String> generator = mock(IdGenerator.class); - private final OmegaContext context = new OmegaContext(generator); + private final OmegaException exception = new OmegaException("exception", new RuntimeException("runtime exception")); private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender); @@ -86,4 +90,21 @@ public class SagaStartAnnotationProcessorTest { assertThat(event.type(), is(EventType.SagaEndedEvent)); assertThat(event.payloads().length, is(0)); } + + @Test + public void transformInterceptedException() { + MessageSender sender = mock(MessageSender.class); + SagaStartAnnotationProcessor sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender); + + doThrow(exception).when(sender).send(any()); + + try { + sagaStartAnnotationProcessor.preIntercept(null, null); + expectFailing(TransactionalException.class); + } catch (TransactionalException e) { + assertThat(e.getMessage(), is("exception")); + assertThat(e.getCause(), instanceOf(RuntimeException.class)); + assertThat(e.getCause().getMessage(), is("runtime exception")); + } + } } diff --git a/pom.xml b/pom.xml index 1d41a45..9338b2f 100755 --- a/pom.xml +++ b/pom.xml @@ -60,6 +60,7 @@ <maven.failsafe.version>2.19.1</maven.failsafe.version> <grpc.version>1.8.0</grpc.version> <kryo.version>4.0.1</kryo.version> + <javax.transaction.version>1.2</javax.transaction.version> </properties> <name>ServiceComb Saga</name> @@ -347,6 +348,11 @@ <artifactId>kryo</artifactId> <version>${kryo.version}</version> </dependency> + <dependency> + <groupId>javax.transaction</groupId> + <artifactId>javax.transaction-api</artifactId> + <version>${javax.transaction.version}</version> + </dependency> <!-- test dependencies --> <dependency> -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
