This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
The following commit(s) were added to refs/heads/master by this push: new 7e586b5 SCB-853 Saga-core supports JDK7 now 7e586b5 is described below commit 7e586b569c61b6faafe1d376922a49522fcba848 Author: Willem Jiang <jiangni...@huawei.com> AuthorDate: Mon Aug 20 09:30:14 2018 +0800 SCB-853 Saga-core supports JDK7 now --- saga-core/pom.xml | 10 ++++ .../servicecomb/saga/core/BackwardRecovery.java | 5 ++ .../apache/servicecomb/saga/core/Compensation.java | 32 +++++++++-- .../saga/core/CompositeSagaResponse.java | 31 ++++++++--- .../apache/servicecomb/saga/core/Descriptive.java | 4 +- .../org/apache/servicecomb/saga/core/Fallback.java | 17 +++++- .../servicecomb/saga/core/ForwardRecovery.java | 5 ++ .../servicecomb/saga/core/GraphBasedSaga.java | 15 +++--- .../saga/core/LoggingRecoveryPolicy.java | 5 ++ .../servicecomb/saga/core/NoOpSagaRequest.java | 6 +++ .../apache/servicecomb/saga/core/Operation.java | 8 +-- .../servicecomb/saga/core/RestOperation.java | 14 +++-- .../apache/servicecomb/saga/core/SagaContext.java | 4 +- .../servicecomb/saga/core/SagaContextImpl.java | 35 +++++++----- .../apache/servicecomb/saga/core/SagaEvent.java | 5 ++ .../apache/servicecomb/saga/core/SagaRequest.java | 6 +-- .../servicecomb/saga/core/SagaTaskFactory.java | 10 ++-- .../apache/servicecomb/saga/core/Transaction.java | 18 +++++++ .../servicecomb/saga/core/TransactionConsumer.java | 5 ++ .../saga/core/dag/ByLevelTraveller.java | 5 +- .../saga/core/dag/GraphBasedSagaFactory.java | 3 +- .../servicecomb/saga/core/dag/GraphBuilder.java | 9 ++-- .../saga/core/dag/GraphCycleDetectorImpl.java | 26 +++++---- .../org/apache/servicecomb/saga/core/dag/Node.java | 4 +- .../servicecomb/saga/core/CompensationImpl.java | 5 ++ .../saga/core/CompositeSagaResponseTest.java | 8 +++ .../servicecomb/saga/core/ForwardRecoveryTest.java | 7 ++- .../servicecomb/saga/core/RestOperationTest.java | 18 ++++--- .../servicecomb/saga/core/RetrySagaLogTest.java | 19 +++++-- .../saga/core/SagaExecutionComponentTestBase.java | 44 +++++++++++---- .../servicecomb/saga/core/SagaIntegrationTest.java | 63 ++++++++++++++-------- .../dag/DirectedAcyclicGraphTraversalTest.java | 4 +- .../saga/core/dag/GraphBuilderTest.java | 40 ++++++++------ .../saga/core/dag/GraphCycleDetectorTest.java | 3 +- .../servicecomb/saga/format/JacksonFallback.java | 11 ++++ 35 files changed, 366 insertions(+), 138 deletions(-) diff --git a/saga-core/pom.xml b/saga-core/pom.xml index fd1b47c..e460b1a 100644 --- a/saga-core/pom.xml +++ b/saga-core/pom.xml @@ -95,6 +95,16 @@ </execution> </executions> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.1</version> + <configuration> + <encoding>UTF-8</encoding> + <source>1.7</source> + <target>1.7</target> + </configuration> + </plugin> </plugins> </build> diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/BackwardRecovery.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/BackwardRecovery.java index e46de2f..799dbbf 100644 --- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/BackwardRecovery.java +++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/BackwardRecovery.java @@ -46,4 +46,9 @@ public class BackwardRecovery implements RecoveryPolicy { throw e; } } + + @Override + public String description() { + return getClass().getSimpleName(); + } } diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Compensation.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Compensation.java index f642011..a6a352b 100644 --- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Compensation.java +++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Compensation.java @@ -20,14 +20,40 @@ package org.apache.servicecomb.saga.core; public interface Compensation extends Operation { Compensation SAGA_START_COMPENSATION = new Compensation() { + @Override + public SagaResponse send(String address) { + return SUCCESSFUL_SAGA_RESPONSE; + } + + @Override + public SagaResponse send(String address, SagaResponse response) { + return send(address); + } + + @Override + public int retries() { + return DEFAULT_RETRIES; + } }; Compensation SAGA_END_COMPENSATION = new Compensation() { + @Override + public SagaResponse send(String address) { + return SUCCESSFUL_SAGA_RESPONSE; + } + + @Override + public SagaResponse send(String address, SagaResponse response) { + return send(address); + } + + @Override + public int retries() { + return DEFAULT_RETRIES; + } }; int DEFAULT_RETRIES = 3; - default int retries() { - return DEFAULT_RETRIES; - } + int retries(); } diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/CompositeSagaResponse.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/CompositeSagaResponse.java index 02ab48c..84753b9 100644 --- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/CompositeSagaResponse.java +++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/CompositeSagaResponse.java @@ -18,7 +18,6 @@ package org.apache.servicecomb.saga.core; import java.util.Collection; -import java.util.Optional; public class CompositeSagaResponse implements SagaResponse { private final Collection<SagaResponse> responses; @@ -29,19 +28,35 @@ public class CompositeSagaResponse implements SagaResponse { @Override public boolean succeeded() { - return responses.stream().allMatch(SagaResponse::succeeded); + if (responses.size() > 0) { + boolean result = true; + for (SagaResponse response : responses) { + result = result && response.succeeded(); + } + return result; + } else { + return false; + } } @Override public String body() { - Optional<String> reduce = responses.stream() - .map(SagaResponse::body) - .reduce((a, b) -> a + ", " + b) - .map(combined -> "[" + combined + "]"); - - return reduce.orElse("{}"); + StringBuffer result = new StringBuffer(); + if (responses.size() == 0) { + result.append("{}"); + } else { + result.append("["); + for (SagaResponse response : responses) { + result.append(response.body()); + result.append(", "); + } + result.delete(result.length()-2, result.length()); + result.append("]"); + } + return result.toString(); } + public Collection<SagaResponse> responses() { return responses; } diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Descriptive.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Descriptive.java index bd367af..e8b4f4e 100644 --- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Descriptive.java +++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Descriptive.java @@ -19,7 +19,5 @@ package org.apache.servicecomb.saga.core; interface Descriptive { - default String description() { - return getClass().getSimpleName(); - } + String description(); } diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Fallback.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Fallback.java index fca68a0..8c1068b 100644 --- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Fallback.java +++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Fallback.java @@ -19,7 +19,22 @@ package org.apache.servicecomb.saga.core; public interface Fallback extends Operation { - Fallback NOP_FALLBACK = () -> TYPE_NOP; + Fallback NOP_FALLBACK = new Fallback() { + @Override + public String type() { + return TYPE_NOP; + } + + @Override + public SagaResponse send(String address) { + return SUCCESSFUL_SAGA_RESPONSE; + } + + @Override + public SagaResponse send(String address, SagaResponse response) { + return send(address); + } + }; String type(); } diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/ForwardRecovery.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/ForwardRecovery.java index c783990..75e16ee 100644 --- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/ForwardRecovery.java +++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/ForwardRecovery.java @@ -50,4 +50,9 @@ public class ForwardRecovery implements RecoveryPolicy { throw new TransactionFailedException(ignored); } } + + @Override + public String description() { + return getClass().getSimpleName(); + } } diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/GraphBasedSaga.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/GraphBasedSaga.java index f4544e3..69347fc 100644 --- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/GraphBasedSaga.java +++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/GraphBasedSaga.java @@ -65,15 +65,15 @@ public class GraphBasedSaga implements Saga { this.tasks = tasks; this.transactionTaskRunner = new TaskRunner( - traveller(sagaTaskGraph, new FromRootTraversalDirection<>()), + traveller(sagaTaskGraph, new FromRootTraversalDirection<SagaRequest>()), new TransactionTaskConsumer( tasks, sagaContext, - new ExecutorCompletionService<>(executor))); + new ExecutorCompletionService<Operation>(executor))); this.sagaContext = sagaContext; this.compensationTaskRunner = new TaskRunner( - traveller(sagaTaskGraph, new FromLeafTraversalDirection<>()), + traveller(sagaTaskGraph, new FromLeafTraversalDirection<SagaRequest>()), new CompensationTaskConsumer(tasks, sagaContext)); currentTaskRunner = transactionTaskRunner; @@ -92,9 +92,12 @@ public class GraphBasedSaga implements Saga { log.error("Failed to run operation", e); currentTaskRunner = compensationTaskRunner; - sagaContext.handleHangingTransactions(request -> { - tasks.get(request.task()).commit(request, sagaContext.responseOf(request.parents())); - tasks.get(request.task()).compensate(request); + sagaContext.handleHangingTransactions(new TransactionConsumer<SagaRequest>() { + @Override + public void accept(SagaRequest request) { + tasks.get(request.task()).commit(request, sagaContext.responseOf(request.parents())); + tasks.get(request.task()).compensate(request); + } }); } } while (currentTaskRunner.hasNext()); diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/LoggingRecoveryPolicy.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/LoggingRecoveryPolicy.java index 9173eca..593aa91 100644 --- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/LoggingRecoveryPolicy.java +++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/LoggingRecoveryPolicy.java @@ -43,4 +43,9 @@ public class LoggingRecoveryPolicy implements RecoveryPolicy { log.info("Completed request id={} for service {}", request.id(), request.serviceName()); return response; } + + @Override + public String description() { + return getClass().getSimpleName(); + } } diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/NoOpSagaRequest.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/NoOpSagaRequest.java index 56ad05e..93bbda9 100644 --- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/NoOpSagaRequest.java +++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/NoOpSagaRequest.java @@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.core; import static org.apache.servicecomb.saga.core.Compensation.SAGA_END_COMPENSATION; import static org.apache.servicecomb.saga.core.Compensation.SAGA_START_COMPENSATION; +import static org.apache.servicecomb.saga.core.Fallback.NOP_FALLBACK; import static org.apache.servicecomb.saga.core.Operation.TYPE_NOP; import static org.apache.servicecomb.saga.core.SagaTask.SAGA_END_TASK; import static org.apache.servicecomb.saga.core.SagaTask.SAGA_START_TASK; @@ -65,6 +66,11 @@ public class NoOpSagaRequest implements SagaRequest { } @Override + public Fallback fallback() { + return NOP_FALLBACK; + } + + @Override public String serviceName() { return "Saga"; } diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Operation.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Operation.java index 13b6f8a..99e760b 100644 --- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Operation.java +++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Operation.java @@ -23,11 +23,7 @@ public interface Operation { String TYPE_REST = "rest"; SagaResponse SUCCESSFUL_SAGA_RESPONSE = new SuccessfulSagaResponse("success"); - default SagaResponse send(String address) { - return SUCCESSFUL_SAGA_RESPONSE; - } + SagaResponse send(String address); - default SagaResponse send(String address, SagaResponse response) { - return send(address); - } + SagaResponse send(String address, SagaResponse response); } diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/RestOperation.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/RestOperation.java index 5cce22d..153eb98 100644 --- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/RestOperation.java +++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/RestOperation.java @@ -17,8 +17,6 @@ package org.apache.servicecomb.saga.core; -import static java.util.Collections.emptyMap; - import org.apache.servicecomb.saga.core.application.interpreter.RestRequestChecker; import java.util.Map; @@ -33,7 +31,7 @@ public class RestOperation implements Operation { this.path = path; this.method = method; - this.params = params == null? emptyMap() : params; + this.params = params == null? java.util.Collections.<String, Map<String, String>>emptyMap() : params; } public String path() { @@ -56,4 +54,14 @@ public class RestOperation implements Operation { ", params=" + params + '}'; } + + @Override + public SagaResponse send(String address) { + return SUCCESSFUL_SAGA_RESPONSE; + } + + @Override + public SagaResponse send(String address, SagaResponse response) { + return send(address); + } } diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContext.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContext.java index 309be44..2f76031 100644 --- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContext.java +++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContext.java @@ -17,8 +17,6 @@ package org.apache.servicecomb.saga.core; -import java.util.function.Consumer; - public interface SagaContext extends EventContext { boolean isCompensationStarted(); @@ -26,7 +24,7 @@ public interface SagaContext extends EventContext { boolean isCompensationCompleted(SagaRequest request); - void handleHangingTransactions(Consumer<SagaRequest> consumer); + void handleHangingTransactions(TransactionConsumer<SagaRequest> consumer); SagaResponse responseOf(String requestId); diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContextImpl.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContextImpl.java index 610ae27..bc4253c 100644 --- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContextImpl.java +++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContextImpl.java @@ -17,16 +17,16 @@ package org.apache.servicecomb.saga.core; -import java.util.Arrays; +import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; -import java.util.function.Consumer; -import java.util.stream.Collectors; + import org.apache.servicecomb.saga.core.application.interpreter.FromJsonFormat; @@ -86,7 +86,7 @@ public class SagaContextImpl implements SagaContext { } @Override - public void handleHangingTransactions(Consumer<SagaRequest> consumer) { + public void handleHangingTransactions(TransactionConsumer<SagaRequest> consumer) { for (Iterator<SagaRequest> iterator = hangingTransactions.values().iterator(); iterator.hasNext(); ) { consumer.accept(iterator.next()); } @@ -94,13 +94,19 @@ public class SagaContextImpl implements SagaContext { @Override public SagaResponse responseOf(String requestId) { - return completedTransactions.getOrDefault(requestId, SagaResponse.NONE_RESPONSE); + SagaResponse response = completedTransactions.get(requestId); + if (response == null) { + response = SagaResponse.NONE_RESPONSE; + } + return response; } private List<SagaResponse> responsesOf(String[] parentRequestIds) { - return Arrays.stream(parentRequestIds) - .map(this::responseOf) - .collect(Collectors.toList()); + List<SagaResponse> result = new ArrayList<>(); + for(String parentRequestId: parentRequestIds) { + result.add(responseOf(parentRequestId)); + } + return result; } @Override @@ -125,9 +131,14 @@ public class SagaContextImpl implements SagaContext { } private Set<String> chosenChildrenOf(String[] parentRequestIds) { - return Arrays.stream(parentRequestIds) - .map(this::responseOf) - .flatMap(sagaResponse -> childrenExtractor.fromJson(sagaResponse.body()).stream()) - .collect(Collectors.toSet()); + Set<String> result = new HashSet<>(); + for(String parentRequestId: parentRequestIds) { + SagaResponse response = responseOf(parentRequestId); + Set<String> jsons = childrenExtractor.fromJson(response.body()); + for (String json : jsons) { + result.add(json); + } + } + return result; } } diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaEvent.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaEvent.java index ceb0ebe..ffe31d0 100644 --- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaEvent.java +++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaEvent.java @@ -36,4 +36,9 @@ public abstract class SagaEvent implements Descriptive { public String json(ToJsonFormat toJsonFormat) { return "{}"; } + + @Override + public String description() { + return getClass().getSimpleName(); + } } diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaRequest.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaRequest.java index 337abdd..4578e42 100644 --- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaRequest.java +++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaRequest.java @@ -17,8 +17,6 @@ package org.apache.servicecomb.saga.core; -import static org.apache.servicecomb.saga.core.Fallback.NOP_FALLBACK; - public interface SagaRequest { String PARAM_FORM = "form"; @@ -30,9 +28,7 @@ public interface SagaRequest { Compensation compensation(); - default Fallback fallback() { - return NOP_FALLBACK; - } + Fallback fallback(); String serviceName(); diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaTaskFactory.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaTaskFactory.java index 0fd2450..b6d9585 100644 --- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaTaskFactory.java +++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaTaskFactory.java @@ -36,12 +36,12 @@ public class SagaTaskFactory { retrySagaLog = new RetrySagaLog(persistentStore, retryDelay); } - public Map<String, SagaTask> sagaTasks(String sagaId, - String requestJson, - RecoveryPolicy recoveryPolicy, - EventStore sagaLog) { + public Map<String, SagaTask> sagaTasks(final String sagaId, + final String requestJson, + final RecoveryPolicy recoveryPolicy, + final EventStore sagaLog) { - SagaLog compositeSagaLog = compositeSagaLog(sagaLog, persistentStore); + final SagaLog compositeSagaLog = compositeSagaLog(sagaLog, persistentStore); return new HashMap<String, SagaTask>() {{ put(SagaTask.SAGA_START_TASK, new SagaStartTask(sagaId, requestJson, compositeSagaLog)); diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Transaction.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Transaction.java index fafb845..389f8fa 100644 --- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Transaction.java +++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Transaction.java @@ -20,8 +20,26 @@ package org.apache.servicecomb.saga.core; public interface Transaction extends Operation { Transaction SAGA_START_TRANSACTION = new Transaction() { + @Override + public SagaResponse send(String address) { + return SUCCESSFUL_SAGA_RESPONSE; + } + + @Override + public SagaResponse send(String address, SagaResponse response) { + return send(address); + } }; Transaction SAGA_END_TRANSACTION = new Transaction() { + @Override + public SagaResponse send(String address) { + return SUCCESSFUL_SAGA_RESPONSE; + } + + @Override + public SagaResponse send(String address, SagaResponse response) { + return send(address); + } }; } diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/TransactionConsumer.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/TransactionConsumer.java new file mode 100644 index 0000000..b5bb098 --- /dev/null +++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/TransactionConsumer.java @@ -0,0 +1,5 @@ +package org.apache.servicecomb.saga.core; + +public interface TransactionConsumer<T> { + void accept(T request); +} diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/ByLevelTraveller.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/ByLevelTraveller.java index 22bc874..2489117 100644 --- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/ByLevelTraveller.java +++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/ByLevelTraveller.java @@ -59,7 +59,10 @@ public class ByLevelTraveller<T> implements Traveller<T> { nodes.add(node); for (Node<T> child : traversalDirection.children(node)) { - nodeParents.computeIfAbsent(child.id(), id -> new HashSet<>(traversalDirection.parents(child))); + // This is not thread safe + if (nodeParents.get(child.id()) == null) { + nodeParents.put(child.id(), new HashSet<>(traversalDirection.parents(child))); + } nodeParents.get(child.id()).remove(node); if (nodeParents.get(child.id()).isEmpty()) { diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBasedSagaFactory.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBasedSagaFactory.java index 2048383..55c71c1 100644 --- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBasedSagaFactory.java +++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBasedSagaFactory.java @@ -27,6 +27,7 @@ import org.apache.servicecomb.saga.core.GraphBasedSaga; import org.apache.servicecomb.saga.core.Saga; import org.apache.servicecomb.saga.core.SagaContext; import org.apache.servicecomb.saga.core.SagaContextImpl; +import org.apache.servicecomb.saga.core.SagaRequest; import org.apache.servicecomb.saga.core.application.SagaFactory; import org.apache.servicecomb.saga.infrastructure.ContextAwareEventStore; import org.apache.servicecomb.saga.core.PersistentStore; @@ -49,7 +50,7 @@ public class GraphBasedSagaFactory implements SagaFactory { this.childrenExtractor = childrenExtractor; this.executorService = executorService; this.sagaTaskFactory = new SagaTaskFactory(retryDelay, persistentStore); - this.graphBuilder = new GraphBuilder(new GraphCycleDetectorImpl<>()); + this.graphBuilder = new GraphBuilder(new GraphCycleDetectorImpl<SagaRequest>()); } @Override diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBuilder.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBuilder.java index 578cbb9..c5cd850 100644 --- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBuilder.java +++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBuilder.java @@ -63,10 +63,11 @@ public class GraphBuilder { } } - requestNodes.values().stream() - .filter((node) -> node.children().isEmpty()) - .forEach(node -> node.addChild(leaf)); - + for(Node<SagaRequest> node : requestNodes.values()) { + if (node.children().isEmpty()) { + node.addChild(leaf); + } + } return new SingleLeafDirectedAcyclicGraph<>(root, leaf); } diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorImpl.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorImpl.java index 6011a98..f2869b3 100644 --- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorImpl.java +++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorImpl.java @@ -45,26 +45,32 @@ public class GraphCycleDetectorImpl<T> implements GraphCycleDetector<T> { return unvisitedNodes(nodeParents); } + // This method is not thread safe private void traverse(Queue<Node<T>> orphanNodes, Map<Node<T>, Set<Node<T>>> nodeParents) { while (!orphanNodes.isEmpty()) { Node<T> node = orphanNodes.poll(); - node.children().forEach(child -> { - nodeParents.computeIfAbsent(child, n -> new HashSet<>(child.parents())) - .remove(node); - + for(Node<T> child : node.children()) { + Set<Node<T>> parent = nodeParents.get(child); + if (parent == null) { + parent = new HashSet<>(child.parents()); + nodeParents.put(child, parent); + } + parent.remove(node); if (nodeParents.get(child).isEmpty()) { orphanNodes.add(child); } - }); + } } } private Set<Node<T>> unvisitedNodes(Map<Node<T>, Set<Node<T>>> nodeParents) { - return nodeParents.entrySet() - .parallelStream() - .filter(parents -> !parents.getValue().isEmpty()) - .map(Entry::getKey) - .collect(Collectors.toSet()); + Set<Node<T>> result = new HashSet<>(); + for (Map.Entry<Node<T>, Set<Node<T>>> entry : nodeParents.entrySet()) { + if (!entry.getValue().isEmpty()) { + result.add(entry.getKey()); + } + } + return result; } } diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/Node.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/Node.java index 0b3be0a..3e5bcad 100644 --- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/Node.java +++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/Node.java @@ -56,7 +56,9 @@ public class Node<T> { public void addChildren(Collection<Node<T>> nodes) { children.addAll(nodes); - nodes.forEach(node -> node.parents.add(this)); + for (Node<T> node : nodes) { + node.parents.add(this); + } } @Override diff --git a/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompensationImpl.java b/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompensationImpl.java index c974bc4..0077802 100644 --- a/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompensationImpl.java +++ b/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompensationImpl.java @@ -24,4 +24,9 @@ public class CompensationImpl extends RestOperation implements Compensation { public CompensationImpl(String path, String method, Map<String, Map<String, String>> params) { super(path, method, params); } + + @Override + public int retries() { + return DEFAULT_RETRIES; + } } diff --git a/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompositeSagaResponseTest.java b/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompositeSagaResponseTest.java index be4b9f4..449340c 100644 --- a/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompositeSagaResponseTest.java +++ b/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompositeSagaResponseTest.java @@ -22,6 +22,8 @@ import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.when; +import java.util.ArrayList; + import org.junit.Test; import org.mockito.Mockito; @@ -31,6 +33,7 @@ public class CompositeSagaResponseTest { private final SagaResponse response2 = Mockito.mock(SagaResponse.class); private final SagaResponse compositeSagaResponse = new CompositeSagaResponse(asList(response1, response2)); + private final SagaResponse emptySagaResponse = new CompositeSagaResponse(new ArrayList<SagaResponse>()); @Test public void succeededOnlyWhenAllAreSuccessful() throws Exception { @@ -70,4 +73,9 @@ public class CompositeSagaResponseTest { + "}\n" + "]")); } + + @Test + public void EmptyCompositeSagaResponse() { + assertThat(emptySagaResponse.body(), is("{}")); + } } diff --git a/saga-core/src/test/java/org/apache/servicecomb/saga/core/ForwardRecoveryTest.java b/saga-core/src/test/java/org/apache/servicecomb/saga/core/ForwardRecoveryTest.java index a6e1dca..078f996 100644 --- a/saga-core/src/test/java/org/apache/servicecomb/saga/core/ForwardRecoveryTest.java +++ b/saga-core/src/test/java/org/apache/servicecomb/saga/core/ForwardRecoveryTest.java @@ -49,7 +49,12 @@ public class ForwardRecoveryTest { public void blowsUpWhenTaskIsNotCommittedWithFailRetryDelaySeconds() throws Exception { doThrow(Exception.class).when(transaction).send(serviceName, parentResponse); - Thread t = new Thread(() -> forwardRecovery.apply(sagaTask, sagaRequest, parentResponse)); + Thread t = new Thread(new Runnable() { + @Override + public void run() { + forwardRecovery.apply(sagaTask, sagaRequest, parentResponse); + } + }); t.start(); Thread.sleep(400); t.interrupt(); diff --git a/saga-core/src/test/java/org/apache/servicecomb/saga/core/RestOperationTest.java b/saga-core/src/test/java/org/apache/servicecomb/saga/core/RestOperationTest.java index 7f1990d..900bb8b 100644 --- a/saga-core/src/test/java/org/apache/servicecomb/saga/core/RestOperationTest.java +++ b/saga-core/src/test/java/org/apache/servicecomb/saga/core/RestOperationTest.java @@ -23,14 +23,20 @@ import static java.util.Collections.singletonMap; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; +import java.util.Collections; +import java.util.Map; + import org.junit.Test; public class RestOperationTest { + public static final Map<String, Map<String, String>> EMPTY_MAP_MAP = Collections.<String, Map<String, String>>emptyMap(); + public static final Map<String, String> EMPTY_MAP = Collections.<String, String>emptyMap(); + @Test public void blowsUpWhenGetMethodWithForm() { try { - new RestOperation("blah", "GET", singletonMap("form", emptyMap())); + new RestOperation("blah", "GET", singletonMap("form", EMPTY_MAP)); expectFailing(IllegalArgumentException.class); } catch (IllegalArgumentException e) { assertThat(e.getMessage(), is("GET & DELETE request cannot enclose a body")); @@ -40,7 +46,7 @@ public class RestOperationTest { @Test public void blowsUpWhenGetMethodWithJson() { try { - new RestOperation("blah", "GET", singletonMap("json", emptyMap())); + new RestOperation("blah", "GET", singletonMap("json", EMPTY_MAP)); expectFailing(IllegalArgumentException.class); } catch (IllegalArgumentException e) { assertThat(e.getMessage(), is("GET & DELETE request cannot enclose a body")); @@ -50,7 +56,7 @@ public class RestOperationTest { @Test public void blowsUpWhenDeleteMethodWithForm() { try { - new RestOperation("blah", "DELETE", singletonMap("form", emptyMap())); + new RestOperation("blah", "DELETE", singletonMap("form", EMPTY_MAP)); expectFailing(IllegalArgumentException.class); } catch (IllegalArgumentException e) { assertThat(e.getMessage(), is("GET & DELETE request cannot enclose a body")); @@ -60,7 +66,7 @@ public class RestOperationTest { @Test public void blowsUpWhenDeleteMethodWithJson() { try { - new RestOperation("blah", "DELETE", singletonMap("json", emptyMap())); + new RestOperation("blah", "DELETE", singletonMap("json", EMPTY_MAP)); expectFailing(IllegalArgumentException.class); } catch (IllegalArgumentException e) { assertThat(e.getMessage(), is("GET & DELETE request cannot enclose a body")); @@ -70,7 +76,7 @@ public class RestOperationTest { @Test public void blowsUpWhenMethodIsNotSupported() { try { - new RestOperation("blah", "foo", emptyMap()); + new RestOperation("blah", "foo", EMPTY_MAP_MAP ); expectFailing(IllegalArgumentException.class); } catch (IllegalArgumentException e) { assertThat(e.getMessage(), is("Unsupported method foo")); @@ -80,7 +86,7 @@ public class RestOperationTest { @Test public void blowsUpWhenMethodIsNull() { try { - new RestOperation("blah", null, emptyMap()); + new RestOperation("blah", null, EMPTY_MAP_MAP); expectFailing(IllegalArgumentException.class); } catch (IllegalArgumentException e) { assertThat(e.getMessage(), is("Unsupported method null")); diff --git a/saga-core/src/test/java/org/apache/servicecomb/saga/core/RetrySagaLogTest.java b/saga-core/src/test/java/org/apache/servicecomb/saga/core/RetrySagaLogTest.java index 6da8f16..9f9bdc8 100644 --- a/saga-core/src/test/java/org/apache/servicecomb/saga/core/RetrySagaLogTest.java +++ b/saga-core/src/test/java/org/apache/servicecomb/saga/core/RetrySagaLogTest.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -63,18 +64,26 @@ public class RetrySagaLogTest { public void exitOnInterruption() throws InterruptedException { ExecutorService executor = Executors.newSingleThreadExecutor(); - Future<?> future = executor.submit(() -> { - doThrow(RuntimeException.class).when(persistentStore).offer(dummyEvent); + Future<?> future = executor.submit(new Runnable() { + @Override + public void run() { + doThrow(RuntimeException.class).when(persistentStore).offer(dummyEvent); - retrySagaLog.offer(dummyEvent); - interrupted = true; + retrySagaLog.offer(dummyEvent); + interrupted = true; + } }); Thread.sleep(500); assertThat(future.cancel(true), is(true)); - await().atMost(2, TimeUnit.SECONDS).until(() -> interrupted); + await().atMost(2, TimeUnit.SECONDS).until(new Callable<Boolean>() { + @Override + public Boolean call() { + return interrupted; + } + }); executor.shutdown(); } } diff --git a/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaExecutionComponentTestBase.java b/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaExecutionComponentTestBase.java index b9e893f..d4f88c4 100644 --- a/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaExecutionComponentTestBase.java +++ b/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaExecutionComponentTestBase.java @@ -18,7 +18,6 @@ package org.apache.servicecomb.saga.core; import static org.apache.servicecomb.saga.core.Operation.TYPE_REST; -import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static java.util.concurrent.TimeUnit.SECONDS; @@ -30,9 +29,12 @@ import static org.junit.Assert.assertThat; import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.servicecomb.saga.core.application.SagaFactory; import org.hamcrest.Description; @@ -48,6 +50,7 @@ import org.apache.servicecomb.saga.infrastructure.EmbeddedEventStore; @SuppressWarnings("unchecked") public abstract class SagaExecutionComponentTestBase { + private static final String requestJson = "[\n" + " {\n" + " \"id\": \"request-1\",\n" @@ -88,20 +91,22 @@ public abstract class SagaExecutionComponentTestBase { + " \"requests\": " + anotherRequestJson + "\n" + "}"; + public static final Map<String, Map<String, String>> EMPTY_MAP = Collections.<String, Map<String, String>>emptyMap(); + private final SagaRequest request1 = new SagaRequestImpl( "request-1", "aaa", TYPE_REST, - new TransactionImpl("/rest/as", "post", emptyMap()), - new CompensationImpl("/rest/as", "delete", emptyMap()) + new TransactionImpl("/rest/as", "post", EMPTY_MAP), + new CompensationImpl("/rest/as", "delete", EMPTY_MAP) ); private final SagaRequest request2 = new SagaRequestImpl( "request-2", "bbb", TYPE_REST, - new TransactionImpl("/rest/bs", "post", emptyMap()), - new CompensationImpl("/rest/bs", "delete", emptyMap()) + new TransactionImpl("/rest/bs", "post", EMPTY_MAP), + new CompensationImpl("/rest/bs", "delete", EMPTY_MAP) ); private final SagaDefinition definition1 = new SagaDefinition() { @@ -174,10 +179,27 @@ public abstract class SagaExecutionComponentTestBase { @Test public void processRequestsInParallel() { - CompletableFuture.runAsync(() -> coordinator.run(sagaJson)); - CompletableFuture.runAsync(() -> coordinator.run(anotherSagaJson)); + ExecutorService executor = Executors.newFixedThreadPool(2); - waitAtMost(2, SECONDS).until(() -> eventStore.size() == 8); + executor.submit(new Runnable() { + @Override + public void run() { + coordinator.run(sagaJson); + } + }); + executor.submit(new Runnable() { + @Override + public void run() { + coordinator.run(anotherSagaJson); + } + }); + + waitAtMost(2, SECONDS).until(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + return eventStore.size() == 8; + } + }); assertThat(eventStore, containsInAnyOrder( eventWith(NoOpSagaRequest.SAGA_START_REQUEST, SagaStartedEvent.class), @@ -211,8 +233,8 @@ public abstract class SagaExecutionComponentTestBase { } private Matcher<SagaEvent> eventWith( - SagaRequest sagaRequest, - Class<?> type) { + final SagaRequest sagaRequest, + final Class<?> type) { return new TypeSafeMatcher<SagaEvent>() { @Override diff --git a/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaIntegrationTest.java b/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaIntegrationTest.java index d005ee7..19f9802 100644 --- a/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaIntegrationTest.java +++ b/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaIntegrationTest.java @@ -44,6 +44,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -54,6 +55,7 @@ import java.util.concurrent.CyclicBarrier; import org.junit.Before; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import com.seanyinx.github.unit.scaffolding.Randomness; @@ -116,7 +118,7 @@ public class SagaIntegrationTest { // root - node1 - node2 - leaf @Before public void setUp() throws Exception { - when(childrenExtractor.fromJson(anyString())).thenReturn(emptySet()); + when(childrenExtractor.fromJson(anyString())).thenReturn(Collections.<String>emptySet()); when(childrenExtractor.fromJson(NONE_RESPONSE.body())).thenReturn(setOf("none")); when(transaction1.send(request1.serviceName(), EMPTY_RESPONSE)).thenReturn(transactionResponse1); @@ -169,20 +171,26 @@ public class SagaIntegrationTest { addExtraChildToNode1(); // barrier to make sure the two transactions starts at the same time - CyclicBarrier barrier = new CyclicBarrier(2); + final CyclicBarrier barrier = new CyclicBarrier(2); when(transaction2.send(request2.serviceName(), transactionResponse1)) .thenAnswer( - withAnswer(() -> { - barrier.await(); - Thread.sleep(100); - throw exception; + withAnswer(new Callable<SagaResponse>() { + @Override + public SagaResponse call() throws Exception { + barrier.await(); + Thread.sleep(100); + throw exception; + } })); when(transaction3.send(request3.serviceName(), transactionResponse1)) .thenAnswer( - withAnswer(() -> { - barrier.await(); - return transactionResponse3; + withAnswer(new Callable<SagaResponse>() { + @Override + public SagaResponse call() throws Exception { + barrier.await(); + return transactionResponse3; + } })); saga.run(); @@ -305,21 +313,27 @@ public class SagaIntegrationTest { addExtraChildToNode1(); // barrier to make sure the two transactions starts at the same time - CyclicBarrier barrier = new CyclicBarrier(2); + final CyclicBarrier barrier = new CyclicBarrier(2); when(transaction3.send(request3.serviceName(), transactionResponse1)) - .thenAnswer(withAnswer(() -> { - barrier.await(); - throw exception; - })); + .thenAnswer(withAnswer(new Callable<SagaResponse>() { + @Override + public SagaResponse call() throws Exception { + barrier.await(); + throw exception; + } + })); - CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch latch = new CountDownLatch(1); when(transaction2.send(request2.serviceName(), transactionResponse1)) - .thenAnswer(withAnswer(() -> { - barrier.await(); - latch.await(); - return transactionResponse2; - })).thenReturn(transactionResponse2); + .thenAnswer(withAnswer(new Callable<SagaResponse>() { + @Override + public SagaResponse call() throws Exception { + barrier.await(); + latch.await(); + return transactionResponse2; + } + })).thenReturn(transactionResponse2); saga.run(); @@ -646,8 +660,13 @@ public class SagaIntegrationTest { verify(compensation2, never()).send(request2.serviceName()); } - private Answer<SagaResponse> withAnswer(Callable<SagaResponse> callable) { - return invocationOnMock -> callable.call(); + private Answer<SagaResponse> withAnswer(final Callable<SagaResponse> callable) { + return new Answer<SagaResponse>() { + @Override + public SagaResponse answer(InvocationOnMock invocation) throws Throwable { + return callable.call(); + } + }; } private EventEnvelope envelope(SagaEvent event) { diff --git a/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/DirectedAcyclicGraphTraversalTest.java b/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/DirectedAcyclicGraphTraversalTest.java index beb9607..3c54fcf 100644 --- a/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/DirectedAcyclicGraphTraversalTest.java +++ b/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/DirectedAcyclicGraphTraversalTest.java @@ -58,7 +58,7 @@ public class DirectedAcyclicGraphTraversalTest { @Test public void traverseGraphOneLevelPerStepFromRoot() { - Traveller<String> traveller = new ByLevelTraveller<>(dag, new FromRootTraversalDirection<>()); + Traveller<String> traveller = new ByLevelTraveller<>(dag, new FromRootTraversalDirection<String>()); Collection<Node<String>> nodes = traveller.nodes(); @@ -80,7 +80,7 @@ public class DirectedAcyclicGraphTraversalTest { @Test public void traverseGraphOneLevelPerStepFromLeaf() { - Traveller<String> traveller = new ByLevelTraveller<>(dag, new FromLeafTraversalDirection<>()); + Traveller<String> traveller = new ByLevelTraveller<>(dag, new FromLeafTraversalDirection<String>()); Collection<Node<String>> nodes = traveller.nodes(); diff --git a/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphBuilderTest.java b/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphBuilderTest.java index 8703c7a..9a6c0f8 100644 --- a/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphBuilderTest.java +++ b/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphBuilderTest.java @@ -19,8 +19,6 @@ package org.apache.servicecomb.saga.core.dag; import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing; import static org.apache.servicecomb.saga.core.Operation.TYPE_REST; -import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; import static java.util.Collections.singleton; import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; @@ -31,8 +29,12 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.when; +import java.util.ArrayList; import java.util.Collection; -import java.util.stream.Collectors; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.servicecomb.saga.core.NoOpSagaRequest; import org.apache.servicecomb.saga.core.SagaException; @@ -49,28 +51,30 @@ import org.apache.servicecomb.saga.core.TransactionImpl; @SuppressWarnings("unchecked") public class GraphBuilderTest { + public static final Map<String, Map<String, String>> EMPTY_MAP = Collections.<String, Map<String, String>>emptyMap(); + private final SagaRequest request1 = new SagaRequestImpl( "request-aaa", "aaa", TYPE_REST, - new TransactionImpl("/rest/as", "post", emptyMap()), - new CompensationImpl("/rest/as","delete", emptyMap()) + new TransactionImpl("/rest/as", "post", EMPTY_MAP), + new CompensationImpl("/rest/as","delete", EMPTY_MAP) ); private final SagaRequest request2 = new SagaRequestImpl( "request-bbb", "bbb", TYPE_REST, - new TransactionImpl("/rest/bs", "post", emptyMap()), - new CompensationImpl("/rest/bs","delete", emptyMap()) + new TransactionImpl("/rest/bs", "post", EMPTY_MAP), + new CompensationImpl("/rest/bs","delete", EMPTY_MAP) ); private final SagaRequest request3 = new SagaRequestImpl( "request-ccc", "ccc", TYPE_REST, - new TransactionImpl("/rest/cs", "post", emptyMap()), - new CompensationImpl("/rest/cs","delete", emptyMap()), + new TransactionImpl("/rest/cs", "post", EMPTY_MAP), + new CompensationImpl("/rest/cs","delete", EMPTY_MAP), null, new String[]{"request-aaa", "request-bbb"} ); @@ -80,8 +84,8 @@ public class GraphBuilderTest { "request-duplicate-id", "xxx", TYPE_REST, - new TransactionImpl("/rest/xs", "post", emptyMap()), - new CompensationImpl("/rest/xs","delete", emptyMap()) + new TransactionImpl("/rest/xs", "post", EMPTY_MAP), + new CompensationImpl("/rest/xs","delete", EMPTY_MAP) ); private final SagaRequest[] duplicateRequests = {duplicateRequest, duplicateRequest}; @@ -90,14 +94,14 @@ public class GraphBuilderTest { @Before public void setUp() throws Exception { - when(detector.cycleJoints(any())).thenReturn(emptySet()); + when(detector.cycleJoints((SingleLeafDirectedAcyclicGraph<SagaRequest>)any())).thenReturn((Set<Node<SagaRequest>>) Collections.EMPTY_SET); } @Test public void buildsGraphOfParallelRequests() { SingleLeafDirectedAcyclicGraph<SagaRequest> tasks = graphBuilder.build(requests); - Traveller<SagaRequest> traveller = new ByLevelTraveller<>(tasks, new FromRootTraversalDirection<>()); + Traveller<SagaRequest> traveller = new ByLevelTraveller<>(tasks, new FromRootTraversalDirection<SagaRequest>()); Collection<Node<SagaRequest>> nodes = traveller.nodes(); traveller.next(); @@ -130,7 +134,7 @@ public class GraphBuilderTest { @Test public void blowsUpWhenGraphContainsCycle() { reset(detector); - when(detector.cycleJoints(any())).thenReturn(singleton(new Node<>(0L, null))); + when(detector.cycleJoints((SingleLeafDirectedAcyclicGraph<SagaRequest>) any())).thenReturn(singleton(new Node<SagaRequest>(0L, null))); try { graphBuilder.build(requests); @@ -141,8 +145,10 @@ public class GraphBuilderTest { } private Collection<SagaRequest> requestsOf(Collection<Node<SagaRequest>> nodes) { - return nodes.stream() - .map(Node::value) - .collect(Collectors.toList()); + List<SagaRequest> result = new ArrayList<>(); + for(Node<SagaRequest> node: nodes) { + result.add(node.value()); + } + return result; } } diff --git a/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorTest.java b/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorTest.java index 934c54b..190beeb 100644 --- a/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorTest.java +++ b/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorTest.java @@ -52,7 +52,6 @@ public class GraphCycleDetectorTest { node1.addChild(node3); Set<Node<String>> nodes = detector.cycleJoints(graph); - assertThat(nodes.isEmpty(), is(true)); } @@ -63,7 +62,7 @@ public class GraphCycleDetectorTest { node3.addChild(node1); Set<Node<String>> nodes = detector.cycleJoints(graph); - + assertThat(nodes, contains(node1)); } diff --git a/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonFallback.java b/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonFallback.java index 443ba66..a70aafb 100644 --- a/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonFallback.java +++ b/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonFallback.java @@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.format; import org.apache.servicecomb.saga.core.Fallback; import org.apache.servicecomb.saga.core.Operation; +import org.apache.servicecomb.saga.core.SagaResponse; import org.apache.servicecomb.saga.transports.TransportFactory; import com.fasterxml.jackson.annotation.JsonCreator; @@ -60,5 +61,15 @@ public interface JacksonFallback extends Fallback, TransportAware { public Operation with(TransportFactory transport) { return this; } + + @Override + public SagaResponse send(String address) { + return SUCCESSFUL_SAGA_RESPONSE; + } + + @Override + public SagaResponse send(String address, SagaResponse response) { + return send(address); + } } }