This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
The following commit(s) were added to refs/heads/master by this push:
new 7e586b5 SCB-853 Saga-core supports JDK7 now
7e586b5 is described below
commit 7e586b569c61b6faafe1d376922a49522fcba848
Author: Willem Jiang <[email protected]>
AuthorDate: Mon Aug 20 09:30:14 2018 +0800
SCB-853 Saga-core supports JDK7 now
---
saga-core/pom.xml | 10 ++++
.../servicecomb/saga/core/BackwardRecovery.java | 5 ++
.../apache/servicecomb/saga/core/Compensation.java | 32 +++++++++--
.../saga/core/CompositeSagaResponse.java | 31 ++++++++---
.../apache/servicecomb/saga/core/Descriptive.java | 4 +-
.../org/apache/servicecomb/saga/core/Fallback.java | 17 +++++-
.../servicecomb/saga/core/ForwardRecovery.java | 5 ++
.../servicecomb/saga/core/GraphBasedSaga.java | 15 +++---
.../saga/core/LoggingRecoveryPolicy.java | 5 ++
.../servicecomb/saga/core/NoOpSagaRequest.java | 6 +++
.../apache/servicecomb/saga/core/Operation.java | 8 +--
.../servicecomb/saga/core/RestOperation.java | 14 +++--
.../apache/servicecomb/saga/core/SagaContext.java | 4 +-
.../servicecomb/saga/core/SagaContextImpl.java | 35 +++++++-----
.../apache/servicecomb/saga/core/SagaEvent.java | 5 ++
.../apache/servicecomb/saga/core/SagaRequest.java | 6 +--
.../servicecomb/saga/core/SagaTaskFactory.java | 10 ++--
.../apache/servicecomb/saga/core/Transaction.java | 18 +++++++
.../servicecomb/saga/core/TransactionConsumer.java | 5 ++
.../saga/core/dag/ByLevelTraveller.java | 5 +-
.../saga/core/dag/GraphBasedSagaFactory.java | 3 +-
.../servicecomb/saga/core/dag/GraphBuilder.java | 9 ++--
.../saga/core/dag/GraphCycleDetectorImpl.java | 26 +++++----
.../org/apache/servicecomb/saga/core/dag/Node.java | 4 +-
.../servicecomb/saga/core/CompensationImpl.java | 5 ++
.../saga/core/CompositeSagaResponseTest.java | 8 +++
.../servicecomb/saga/core/ForwardRecoveryTest.java | 7 ++-
.../servicecomb/saga/core/RestOperationTest.java | 18 ++++---
.../servicecomb/saga/core/RetrySagaLogTest.java | 19 +++++--
.../saga/core/SagaExecutionComponentTestBase.java | 44 +++++++++++----
.../servicecomb/saga/core/SagaIntegrationTest.java | 63 ++++++++++++++--------
.../dag/DirectedAcyclicGraphTraversalTest.java | 4 +-
.../saga/core/dag/GraphBuilderTest.java | 40 ++++++++------
.../saga/core/dag/GraphCycleDetectorTest.java | 3 +-
.../servicecomb/saga/format/JacksonFallback.java | 11 ++++
35 files changed, 366 insertions(+), 138 deletions(-)
diff --git a/saga-core/pom.xml b/saga-core/pom.xml
index fd1b47c..e460b1a 100644
--- a/saga-core/pom.xml
+++ b/saga-core/pom.xml
@@ -95,6 +95,16 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <encoding>UTF-8</encoding>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
</plugins>
</build>
diff --git
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/BackwardRecovery.java
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/BackwardRecovery.java
index e46de2f..799dbbf 100644
---
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/BackwardRecovery.java
+++
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/BackwardRecovery.java
@@ -46,4 +46,9 @@ public class BackwardRecovery implements RecoveryPolicy {
throw e;
}
}
+
+ @Override
+ public String description() {
+ return getClass().getSimpleName();
+ }
}
diff --git
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Compensation.java
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Compensation.java
index f642011..a6a352b 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Compensation.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Compensation.java
@@ -20,14 +20,40 @@ package org.apache.servicecomb.saga.core;
public interface Compensation extends Operation {
Compensation SAGA_START_COMPENSATION = new Compensation() {
+ @Override
+ public SagaResponse send(String address) {
+ return SUCCESSFUL_SAGA_RESPONSE;
+ }
+
+ @Override
+ public SagaResponse send(String address, SagaResponse response) {
+ return send(address);
+ }
+
+ @Override
+ public int retries() {
+ return DEFAULT_RETRIES;
+ }
};
Compensation SAGA_END_COMPENSATION = new Compensation() {
+ @Override
+ public SagaResponse send(String address) {
+ return SUCCESSFUL_SAGA_RESPONSE;
+ }
+
+ @Override
+ public SagaResponse send(String address, SagaResponse response) {
+ return send(address);
+ }
+
+ @Override
+ public int retries() {
+ return DEFAULT_RETRIES;
+ }
};
int DEFAULT_RETRIES = 3;
- default int retries() {
- return DEFAULT_RETRIES;
- }
+ int retries();
}
diff --git
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/CompositeSagaResponse.java
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/CompositeSagaResponse.java
index 02ab48c..84753b9 100644
---
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/CompositeSagaResponse.java
+++
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/CompositeSagaResponse.java
@@ -18,7 +18,6 @@
package org.apache.servicecomb.saga.core;
import java.util.Collection;
-import java.util.Optional;
public class CompositeSagaResponse implements SagaResponse {
private final Collection<SagaResponse> responses;
@@ -29,19 +28,35 @@ public class CompositeSagaResponse implements SagaResponse {
@Override
public boolean succeeded() {
- return responses.stream().allMatch(SagaResponse::succeeded);
+ if (responses.size() > 0) {
+ boolean result = true;
+ for (SagaResponse response : responses) {
+ result = result && response.succeeded();
+ }
+ return result;
+ } else {
+ return false;
+ }
}
@Override
public String body() {
- Optional<String> reduce = responses.stream()
- .map(SagaResponse::body)
- .reduce((a, b) -> a + ", " + b)
- .map(combined -> "[" + combined + "]");
-
- return reduce.orElse("{}");
+ StringBuffer result = new StringBuffer();
+ if (responses.size() == 0) {
+ result.append("{}");
+ } else {
+ result.append("[");
+ for (SagaResponse response : responses) {
+ result.append(response.body());
+ result.append(", ");
+ }
+ result.delete(result.length()-2, result.length());
+ result.append("]");
+ }
+ return result.toString();
}
+
public Collection<SagaResponse> responses() {
return responses;
}
diff --git
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Descriptive.java
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Descriptive.java
index bd367af..e8b4f4e 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Descriptive.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Descriptive.java
@@ -19,7 +19,5 @@ package org.apache.servicecomb.saga.core;
interface Descriptive {
- default String description() {
- return getClass().getSimpleName();
- }
+ String description();
}
diff --git
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Fallback.java
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Fallback.java
index fca68a0..8c1068b 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Fallback.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Fallback.java
@@ -19,7 +19,22 @@ package org.apache.servicecomb.saga.core;
public interface Fallback extends Operation {
- Fallback NOP_FALLBACK = () -> TYPE_NOP;
+ Fallback NOP_FALLBACK = new Fallback() {
+ @Override
+ public String type() {
+ return TYPE_NOP;
+ }
+
+ @Override
+ public SagaResponse send(String address) {
+ return SUCCESSFUL_SAGA_RESPONSE;
+ }
+
+ @Override
+ public SagaResponse send(String address, SagaResponse response) {
+ return send(address);
+ }
+ };
String type();
}
diff --git
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/ForwardRecovery.java
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/ForwardRecovery.java
index c783990..75e16ee 100644
---
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/ForwardRecovery.java
+++
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/ForwardRecovery.java
@@ -50,4 +50,9 @@ public class ForwardRecovery implements RecoveryPolicy {
throw new TransactionFailedException(ignored);
}
}
+
+ @Override
+ public String description() {
+ return getClass().getSimpleName();
+ }
}
diff --git
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/GraphBasedSaga.java
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/GraphBasedSaga.java
index f4544e3..69347fc 100644
---
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/GraphBasedSaga.java
+++
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/GraphBasedSaga.java
@@ -65,15 +65,15 @@ public class GraphBasedSaga implements Saga {
this.tasks = tasks;
this.transactionTaskRunner = new TaskRunner(
- traveller(sagaTaskGraph, new FromRootTraversalDirection<>()),
+ traveller(sagaTaskGraph, new
FromRootTraversalDirection<SagaRequest>()),
new TransactionTaskConsumer(
tasks,
sagaContext,
- new ExecutorCompletionService<>(executor)));
+ new ExecutorCompletionService<Operation>(executor)));
this.sagaContext = sagaContext;
this.compensationTaskRunner = new TaskRunner(
- traveller(sagaTaskGraph, new FromLeafTraversalDirection<>()),
+ traveller(sagaTaskGraph, new
FromLeafTraversalDirection<SagaRequest>()),
new CompensationTaskConsumer(tasks, sagaContext));
currentTaskRunner = transactionTaskRunner;
@@ -92,9 +92,12 @@ public class GraphBasedSaga implements Saga {
log.error("Failed to run operation", e);
currentTaskRunner = compensationTaskRunner;
- sagaContext.handleHangingTransactions(request -> {
- tasks.get(request.task()).commit(request,
sagaContext.responseOf(request.parents()));
- tasks.get(request.task()).compensate(request);
+ sagaContext.handleHangingTransactions(new
TransactionConsumer<SagaRequest>() {
+ @Override
+ public void accept(SagaRequest request) {
+ tasks.get(request.task()).commit(request,
sagaContext.responseOf(request.parents()));
+ tasks.get(request.task()).compensate(request);
+ }
});
}
} while (currentTaskRunner.hasNext());
diff --git
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/LoggingRecoveryPolicy.java
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/LoggingRecoveryPolicy.java
index 9173eca..593aa91 100644
---
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/LoggingRecoveryPolicy.java
+++
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/LoggingRecoveryPolicy.java
@@ -43,4 +43,9 @@ public class LoggingRecoveryPolicy implements RecoveryPolicy {
log.info("Completed request id={} for service {}", request.id(),
request.serviceName());
return response;
}
+
+ @Override
+ public String description() {
+ return getClass().getSimpleName();
+ }
}
diff --git
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/NoOpSagaRequest.java
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/NoOpSagaRequest.java
index 56ad05e..93bbda9 100644
---
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/NoOpSagaRequest.java
+++
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/NoOpSagaRequest.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.core;
import static
org.apache.servicecomb.saga.core.Compensation.SAGA_END_COMPENSATION;
import static
org.apache.servicecomb.saga.core.Compensation.SAGA_START_COMPENSATION;
+import static org.apache.servicecomb.saga.core.Fallback.NOP_FALLBACK;
import static org.apache.servicecomb.saga.core.Operation.TYPE_NOP;
import static org.apache.servicecomb.saga.core.SagaTask.SAGA_END_TASK;
import static org.apache.servicecomb.saga.core.SagaTask.SAGA_START_TASK;
@@ -65,6 +66,11 @@ public class NoOpSagaRequest implements SagaRequest {
}
@Override
+ public Fallback fallback() {
+ return NOP_FALLBACK;
+ }
+
+ @Override
public String serviceName() {
return "Saga";
}
diff --git
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Operation.java
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Operation.java
index 13b6f8a..99e760b 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Operation.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Operation.java
@@ -23,11 +23,7 @@ public interface Operation {
String TYPE_REST = "rest";
SagaResponse SUCCESSFUL_SAGA_RESPONSE = new
SuccessfulSagaResponse("success");
- default SagaResponse send(String address) {
- return SUCCESSFUL_SAGA_RESPONSE;
- }
+ SagaResponse send(String address);
- default SagaResponse send(String address, SagaResponse response) {
- return send(address);
- }
+ SagaResponse send(String address, SagaResponse response);
}
diff --git
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/RestOperation.java
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/RestOperation.java
index 5cce22d..153eb98 100644
---
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/RestOperation.java
+++
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/RestOperation.java
@@ -17,8 +17,6 @@
package org.apache.servicecomb.saga.core;
-import static java.util.Collections.emptyMap;
-
import
org.apache.servicecomb.saga.core.application.interpreter.RestRequestChecker;
import java.util.Map;
@@ -33,7 +31,7 @@ public class RestOperation implements Operation {
this.path = path;
this.method = method;
- this.params = params == null? emptyMap() : params;
+ this.params = params == null? java.util.Collections.<String, Map<String,
String>>emptyMap() : params;
}
public String path() {
@@ -56,4 +54,14 @@ public class RestOperation implements Operation {
", params=" + params +
'}';
}
+
+ @Override
+ public SagaResponse send(String address) {
+ return SUCCESSFUL_SAGA_RESPONSE;
+ }
+
+ @Override
+ public SagaResponse send(String address, SagaResponse response) {
+ return send(address);
+ }
}
diff --git
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContext.java
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContext.java
index 309be44..2f76031 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContext.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContext.java
@@ -17,8 +17,6 @@
package org.apache.servicecomb.saga.core;
-import java.util.function.Consumer;
-
public interface SagaContext extends EventContext {
boolean isCompensationStarted();
@@ -26,7 +24,7 @@ public interface SagaContext extends EventContext {
boolean isCompensationCompleted(SagaRequest request);
- void handleHangingTransactions(Consumer<SagaRequest> consumer);
+ void handleHangingTransactions(TransactionConsumer<SagaRequest> consumer);
SagaResponse responseOf(String requestId);
diff --git
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContextImpl.java
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContextImpl.java
index 610ae27..bc4253c 100644
---
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContextImpl.java
+++
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContextImpl.java
@@ -17,16 +17,16 @@
package org.apache.servicecomb.saga.core;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
+
import org.apache.servicecomb.saga.core.application.interpreter.FromJsonFormat;
@@ -86,7 +86,7 @@ public class SagaContextImpl implements SagaContext {
}
@Override
- public void handleHangingTransactions(Consumer<SagaRequest> consumer) {
+ public void handleHangingTransactions(TransactionConsumer<SagaRequest>
consumer) {
for (Iterator<SagaRequest> iterator =
hangingTransactions.values().iterator(); iterator.hasNext(); ) {
consumer.accept(iterator.next());
}
@@ -94,13 +94,19 @@ public class SagaContextImpl implements SagaContext {
@Override
public SagaResponse responseOf(String requestId) {
- return completedTransactions.getOrDefault(requestId,
SagaResponse.NONE_RESPONSE);
+ SagaResponse response = completedTransactions.get(requestId);
+ if (response == null) {
+ response = SagaResponse.NONE_RESPONSE;
+ }
+ return response;
}
private List<SagaResponse> responsesOf(String[] parentRequestIds) {
- return Arrays.stream(parentRequestIds)
- .map(this::responseOf)
- .collect(Collectors.toList());
+ List<SagaResponse> result = new ArrayList<>();
+ for(String parentRequestId: parentRequestIds) {
+ result.add(responseOf(parentRequestId));
+ }
+ return result;
}
@Override
@@ -125,9 +131,14 @@ public class SagaContextImpl implements SagaContext {
}
private Set<String> chosenChildrenOf(String[] parentRequestIds) {
- return Arrays.stream(parentRequestIds)
- .map(this::responseOf)
- .flatMap(sagaResponse ->
childrenExtractor.fromJson(sagaResponse.body()).stream())
- .collect(Collectors.toSet());
+ Set<String> result = new HashSet<>();
+ for(String parentRequestId: parentRequestIds) {
+ SagaResponse response = responseOf(parentRequestId);
+ Set<String> jsons = childrenExtractor.fromJson(response.body());
+ for (String json : jsons) {
+ result.add(json);
+ }
+ }
+ return result;
}
}
diff --git
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaEvent.java
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaEvent.java
index ceb0ebe..ffe31d0 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaEvent.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaEvent.java
@@ -36,4 +36,9 @@ public abstract class SagaEvent implements Descriptive {
public String json(ToJsonFormat toJsonFormat) {
return "{}";
}
+
+ @Override
+ public String description() {
+ return getClass().getSimpleName();
+ }
}
diff --git
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaRequest.java
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaRequest.java
index 337abdd..4578e42 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaRequest.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaRequest.java
@@ -17,8 +17,6 @@
package org.apache.servicecomb.saga.core;
-import static org.apache.servicecomb.saga.core.Fallback.NOP_FALLBACK;
-
public interface SagaRequest {
String PARAM_FORM = "form";
@@ -30,9 +28,7 @@ public interface SagaRequest {
Compensation compensation();
- default Fallback fallback() {
- return NOP_FALLBACK;
- }
+ Fallback fallback();
String serviceName();
diff --git
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaTaskFactory.java
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaTaskFactory.java
index 0fd2450..b6d9585 100644
---
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaTaskFactory.java
+++
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaTaskFactory.java
@@ -36,12 +36,12 @@ public class SagaTaskFactory {
retrySagaLog = new RetrySagaLog(persistentStore, retryDelay);
}
- public Map<String, SagaTask> sagaTasks(String sagaId,
- String requestJson,
- RecoveryPolicy recoveryPolicy,
- EventStore sagaLog) {
+ public Map<String, SagaTask> sagaTasks(final String sagaId,
+ final String requestJson,
+ final RecoveryPolicy recoveryPolicy,
+ final EventStore sagaLog) {
- SagaLog compositeSagaLog = compositeSagaLog(sagaLog, persistentStore);
+ final SagaLog compositeSagaLog = compositeSagaLog(sagaLog,
persistentStore);
return new HashMap<String, SagaTask>() {{
put(SagaTask.SAGA_START_TASK, new SagaStartTask(sagaId, requestJson,
compositeSagaLog));
diff --git
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Transaction.java
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Transaction.java
index fafb845..389f8fa 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Transaction.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Transaction.java
@@ -20,8 +20,26 @@ package org.apache.servicecomb.saga.core;
public interface Transaction extends Operation {
Transaction SAGA_START_TRANSACTION = new Transaction() {
+ @Override
+ public SagaResponse send(String address) {
+ return SUCCESSFUL_SAGA_RESPONSE;
+ }
+
+ @Override
+ public SagaResponse send(String address, SagaResponse response) {
+ return send(address);
+ }
};
Transaction SAGA_END_TRANSACTION = new Transaction() {
+ @Override
+ public SagaResponse send(String address) {
+ return SUCCESSFUL_SAGA_RESPONSE;
+ }
+
+ @Override
+ public SagaResponse send(String address, SagaResponse response) {
+ return send(address);
+ }
};
}
diff --git
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/TransactionConsumer.java
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/TransactionConsumer.java
new file mode 100644
index 0000000..b5bb098
--- /dev/null
+++
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/TransactionConsumer.java
@@ -0,0 +1,5 @@
+package org.apache.servicecomb.saga.core;
+
+public interface TransactionConsumer<T> {
+ void accept(T request);
+}
diff --git
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/ByLevelTraveller.java
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/ByLevelTraveller.java
index 22bc874..2489117 100644
---
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/ByLevelTraveller.java
+++
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/ByLevelTraveller.java
@@ -59,7 +59,10 @@ public class ByLevelTraveller<T> implements Traveller<T> {
nodes.add(node);
for (Node<T> child : traversalDirection.children(node)) {
- nodeParents.computeIfAbsent(child.id(), id -> new
HashSet<>(traversalDirection.parents(child)));
+ // This is not thread safe
+ if (nodeParents.get(child.id()) == null) {
+ nodeParents.put(child.id(), new
HashSet<>(traversalDirection.parents(child)));
+ }
nodeParents.get(child.id()).remove(node);
if (nodeParents.get(child.id()).isEmpty()) {
diff --git
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBasedSagaFactory.java
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBasedSagaFactory.java
index 2048383..55c71c1 100644
---
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBasedSagaFactory.java
+++
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBasedSagaFactory.java
@@ -27,6 +27,7 @@ import org.apache.servicecomb.saga.core.GraphBasedSaga;
import org.apache.servicecomb.saga.core.Saga;
import org.apache.servicecomb.saga.core.SagaContext;
import org.apache.servicecomb.saga.core.SagaContextImpl;
+import org.apache.servicecomb.saga.core.SagaRequest;
import org.apache.servicecomb.saga.core.application.SagaFactory;
import org.apache.servicecomb.saga.infrastructure.ContextAwareEventStore;
import org.apache.servicecomb.saga.core.PersistentStore;
@@ -49,7 +50,7 @@ public class GraphBasedSagaFactory implements SagaFactory {
this.childrenExtractor = childrenExtractor;
this.executorService = executorService;
this.sagaTaskFactory = new SagaTaskFactory(retryDelay, persistentStore);
- this.graphBuilder = new GraphBuilder(new GraphCycleDetectorImpl<>());
+ this.graphBuilder = new GraphBuilder(new
GraphCycleDetectorImpl<SagaRequest>());
}
@Override
diff --git
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBuilder.java
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBuilder.java
index 578cbb9..c5cd850 100644
---
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBuilder.java
+++
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBuilder.java
@@ -63,10 +63,11 @@ public class GraphBuilder {
}
}
- requestNodes.values().stream()
- .filter((node) -> node.children().isEmpty())
- .forEach(node -> node.addChild(leaf));
-
+ for(Node<SagaRequest> node : requestNodes.values()) {
+ if (node.children().isEmpty()) {
+ node.addChild(leaf);
+ }
+ }
return new SingleLeafDirectedAcyclicGraph<>(root, leaf);
}
diff --git
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorImpl.java
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorImpl.java
index 6011a98..f2869b3 100644
---
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorImpl.java
+++
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorImpl.java
@@ -45,26 +45,32 @@ public class GraphCycleDetectorImpl<T> implements
GraphCycleDetector<T> {
return unvisitedNodes(nodeParents);
}
+ // This method is not thread safe
private void traverse(Queue<Node<T>> orphanNodes, Map<Node<T>, Set<Node<T>>>
nodeParents) {
while (!orphanNodes.isEmpty()) {
Node<T> node = orphanNodes.poll();
- node.children().forEach(child -> {
- nodeParents.computeIfAbsent(child, n -> new HashSet<>(child.parents()))
- .remove(node);
-
+ for(Node<T> child : node.children()) {
+ Set<Node<T>> parent = nodeParents.get(child);
+ if (parent == null) {
+ parent = new HashSet<>(child.parents());
+ nodeParents.put(child, parent);
+ }
+ parent.remove(node);
if (nodeParents.get(child).isEmpty()) {
orphanNodes.add(child);
}
- });
+ }
}
}
private Set<Node<T>> unvisitedNodes(Map<Node<T>, Set<Node<T>>> nodeParents) {
- return nodeParents.entrySet()
- .parallelStream()
- .filter(parents -> !parents.getValue().isEmpty())
- .map(Entry::getKey)
- .collect(Collectors.toSet());
+ Set<Node<T>> result = new HashSet<>();
+ for (Map.Entry<Node<T>, Set<Node<T>>> entry : nodeParents.entrySet()) {
+ if (!entry.getValue().isEmpty()) {
+ result.add(entry.getKey());
+ }
+ }
+ return result;
}
}
diff --git
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/Node.java
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/Node.java
index 0b3be0a..3e5bcad 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/Node.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/Node.java
@@ -56,7 +56,9 @@ public class Node<T> {
public void addChildren(Collection<Node<T>> nodes) {
children.addAll(nodes);
- nodes.forEach(node -> node.parents.add(this));
+ for (Node<T> node : nodes) {
+ node.parents.add(this);
+ }
}
@Override
diff --git
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompensationImpl.java
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompensationImpl.java
index c974bc4..0077802 100644
---
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompensationImpl.java
+++
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompensationImpl.java
@@ -24,4 +24,9 @@ public class CompensationImpl extends RestOperation
implements Compensation {
public CompensationImpl(String path, String method, Map<String, Map<String,
String>> params) {
super(path, method, params);
}
+
+ @Override
+ public int retries() {
+ return DEFAULT_RETRIES;
+ }
}
diff --git
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompositeSagaResponseTest.java
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompositeSagaResponseTest.java
index be4b9f4..449340c 100644
---
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompositeSagaResponseTest.java
+++
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompositeSagaResponseTest.java
@@ -22,6 +22,8 @@ import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
+
import org.junit.Test;
import org.mockito.Mockito;
@@ -31,6 +33,7 @@ public class CompositeSagaResponseTest {
private final SagaResponse response2 = Mockito.mock(SagaResponse.class);
private final SagaResponse compositeSagaResponse = new
CompositeSagaResponse(asList(response1, response2));
+ private final SagaResponse emptySagaResponse = new CompositeSagaResponse(new
ArrayList<SagaResponse>());
@Test
public void succeededOnlyWhenAllAreSuccessful() throws Exception {
@@ -70,4 +73,9 @@ public class CompositeSagaResponseTest {
+ "}\n"
+ "]"));
}
+
+ @Test
+ public void EmptyCompositeSagaResponse() {
+ assertThat(emptySagaResponse.body(), is("{}"));
+ }
}
diff --git
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/ForwardRecoveryTest.java
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/ForwardRecoveryTest.java
index a6e1dca..078f996 100644
---
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/ForwardRecoveryTest.java
+++
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/ForwardRecoveryTest.java
@@ -49,7 +49,12 @@ public class ForwardRecoveryTest {
public void blowsUpWhenTaskIsNotCommittedWithFailRetryDelaySeconds() throws
Exception {
doThrow(Exception.class).when(transaction).send(serviceName,
parentResponse);
- Thread t = new Thread(() -> forwardRecovery.apply(sagaTask, sagaRequest,
parentResponse));
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ forwardRecovery.apply(sagaTask, sagaRequest, parentResponse);
+ }
+ });
t.start();
Thread.sleep(400);
t.interrupt();
diff --git
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/RestOperationTest.java
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/RestOperationTest.java
index 7f1990d..900bb8b 100644
---
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/RestOperationTest.java
+++
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/RestOperationTest.java
@@ -23,14 +23,20 @@ import static java.util.Collections.singletonMap;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
+import java.util.Collections;
+import java.util.Map;
+
import org.junit.Test;
public class RestOperationTest {
+ public static final Map<String, Map<String, String>> EMPTY_MAP_MAP =
Collections.<String, Map<String, String>>emptyMap();
+ public static final Map<String, String> EMPTY_MAP = Collections.<String,
String>emptyMap();
+
@Test
public void blowsUpWhenGetMethodWithForm() {
try {
- new RestOperation("blah", "GET", singletonMap("form", emptyMap()));
+ new RestOperation("blah", "GET", singletonMap("form", EMPTY_MAP));
expectFailing(IllegalArgumentException.class);
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), is("GET & DELETE request cannot enclose a
body"));
@@ -40,7 +46,7 @@ public class RestOperationTest {
@Test
public void blowsUpWhenGetMethodWithJson() {
try {
- new RestOperation("blah", "GET", singletonMap("json", emptyMap()));
+ new RestOperation("blah", "GET", singletonMap("json", EMPTY_MAP));
expectFailing(IllegalArgumentException.class);
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), is("GET & DELETE request cannot enclose a
body"));
@@ -50,7 +56,7 @@ public class RestOperationTest {
@Test
public void blowsUpWhenDeleteMethodWithForm() {
try {
- new RestOperation("blah", "DELETE", singletonMap("form", emptyMap()));
+ new RestOperation("blah", "DELETE", singletonMap("form", EMPTY_MAP));
expectFailing(IllegalArgumentException.class);
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), is("GET & DELETE request cannot enclose a
body"));
@@ -60,7 +66,7 @@ public class RestOperationTest {
@Test
public void blowsUpWhenDeleteMethodWithJson() {
try {
- new RestOperation("blah", "DELETE", singletonMap("json", emptyMap()));
+ new RestOperation("blah", "DELETE", singletonMap("json", EMPTY_MAP));
expectFailing(IllegalArgumentException.class);
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), is("GET & DELETE request cannot enclose a
body"));
@@ -70,7 +76,7 @@ public class RestOperationTest {
@Test
public void blowsUpWhenMethodIsNotSupported() {
try {
- new RestOperation("blah", "foo", emptyMap());
+ new RestOperation("blah", "foo", EMPTY_MAP_MAP );
expectFailing(IllegalArgumentException.class);
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), is("Unsupported method foo"));
@@ -80,7 +86,7 @@ public class RestOperationTest {
@Test
public void blowsUpWhenMethodIsNull() {
try {
- new RestOperation("blah", null, emptyMap());
+ new RestOperation("blah", null, EMPTY_MAP_MAP);
expectFailing(IllegalArgumentException.class);
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), is("Unsupported method null"));
diff --git
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/RetrySagaLogTest.java
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/RetrySagaLogTest.java
index 6da8f16..9f9bdc8 100644
---
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/RetrySagaLogTest.java
+++
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/RetrySagaLogTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -63,18 +64,26 @@ public class RetrySagaLogTest {
public void exitOnInterruption() throws InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
- Future<?> future = executor.submit(() -> {
- doThrow(RuntimeException.class).when(persistentStore).offer(dummyEvent);
+ Future<?> future = executor.submit(new Runnable() {
+ @Override
+ public void run() {
+
doThrow(RuntimeException.class).when(persistentStore).offer(dummyEvent);
- retrySagaLog.offer(dummyEvent);
- interrupted = true;
+ retrySagaLog.offer(dummyEvent);
+ interrupted = true;
+ }
});
Thread.sleep(500);
assertThat(future.cancel(true), is(true));
- await().atMost(2, TimeUnit.SECONDS).until(() -> interrupted);
+ await().atMost(2, TimeUnit.SECONDS).until(new Callable<Boolean>() {
+ @Override
+ public Boolean call() {
+ return interrupted;
+ }
+ });
executor.shutdown();
}
}
diff --git
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaExecutionComponentTestBase.java
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaExecutionComponentTestBase.java
index b9e893f..d4f88c4 100644
---
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaExecutionComponentTestBase.java
+++
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaExecutionComponentTestBase.java
@@ -18,7 +18,6 @@
package org.apache.servicecomb.saga.core;
import static org.apache.servicecomb.saga.core.Operation.TYPE_REST;
-import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -30,9 +29,12 @@ import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.servicecomb.saga.core.application.SagaFactory;
import org.hamcrest.Description;
@@ -48,6 +50,7 @@ import
org.apache.servicecomb.saga.infrastructure.EmbeddedEventStore;
@SuppressWarnings("unchecked")
public abstract class SagaExecutionComponentTestBase {
+
private static final String requestJson = "[\n"
+ " {\n"
+ " \"id\": \"request-1\",\n"
@@ -88,20 +91,22 @@ public abstract class SagaExecutionComponentTestBase {
+ " \"requests\": " + anotherRequestJson + "\n"
+ "}";
+ public static final Map<String, Map<String, String>> EMPTY_MAP =
Collections.<String, Map<String, String>>emptyMap();
+
private final SagaRequest request1 = new SagaRequestImpl(
"request-1",
"aaa",
TYPE_REST,
- new TransactionImpl("/rest/as", "post", emptyMap()),
- new CompensationImpl("/rest/as", "delete", emptyMap())
+ new TransactionImpl("/rest/as", "post", EMPTY_MAP),
+ new CompensationImpl("/rest/as", "delete", EMPTY_MAP)
);
private final SagaRequest request2 = new SagaRequestImpl(
"request-2",
"bbb",
TYPE_REST,
- new TransactionImpl("/rest/bs", "post", emptyMap()),
- new CompensationImpl("/rest/bs", "delete", emptyMap())
+ new TransactionImpl("/rest/bs", "post", EMPTY_MAP),
+ new CompensationImpl("/rest/bs", "delete", EMPTY_MAP)
);
private final SagaDefinition definition1 = new SagaDefinition() {
@@ -174,10 +179,27 @@ public abstract class SagaExecutionComponentTestBase {
@Test
public void processRequestsInParallel() {
- CompletableFuture.runAsync(() -> coordinator.run(sagaJson));
- CompletableFuture.runAsync(() -> coordinator.run(anotherSagaJson));
+ ExecutorService executor = Executors.newFixedThreadPool(2);
- waitAtMost(2, SECONDS).until(() -> eventStore.size() == 8);
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ coordinator.run(sagaJson);
+ }
+ });
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ coordinator.run(anotherSagaJson);
+ }
+ });
+
+ waitAtMost(2, SECONDS).until(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ return eventStore.size() == 8;
+ }
+ });
assertThat(eventStore, containsInAnyOrder(
eventWith(NoOpSagaRequest.SAGA_START_REQUEST, SagaStartedEvent.class),
@@ -211,8 +233,8 @@ public abstract class SagaExecutionComponentTestBase {
}
private Matcher<SagaEvent> eventWith(
- SagaRequest sagaRequest,
- Class<?> type) {
+ final SagaRequest sagaRequest,
+ final Class<?> type) {
return new TypeSafeMatcher<SagaEvent>() {
@Override
diff --git
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaIntegrationTest.java
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaIntegrationTest.java
index d005ee7..19f9802 100644
---
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaIntegrationTest.java
+++
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaIntegrationTest.java
@@ -44,6 +44,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -54,6 +55,7 @@ import java.util.concurrent.CyclicBarrier;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.seanyinx.github.unit.scaffolding.Randomness;
@@ -116,7 +118,7 @@ public class SagaIntegrationTest {
// root - node1 - node2 - leaf
@Before
public void setUp() throws Exception {
- when(childrenExtractor.fromJson(anyString())).thenReturn(emptySet());
+
when(childrenExtractor.fromJson(anyString())).thenReturn(Collections.<String>emptySet());
when(childrenExtractor.fromJson(NONE_RESPONSE.body())).thenReturn(setOf("none"));
when(transaction1.send(request1.serviceName(),
EMPTY_RESPONSE)).thenReturn(transactionResponse1);
@@ -169,20 +171,26 @@ public class SagaIntegrationTest {
addExtraChildToNode1();
// barrier to make sure the two transactions starts at the same time
- CyclicBarrier barrier = new CyclicBarrier(2);
+ final CyclicBarrier barrier = new CyclicBarrier(2);
when(transaction2.send(request2.serviceName(), transactionResponse1))
.thenAnswer(
- withAnswer(() -> {
- barrier.await();
- Thread.sleep(100);
- throw exception;
+ withAnswer(new Callable<SagaResponse>() {
+ @Override
+ public SagaResponse call() throws Exception {
+ barrier.await();
+ Thread.sleep(100);
+ throw exception;
+ }
}));
when(transaction3.send(request3.serviceName(), transactionResponse1))
.thenAnswer(
- withAnswer(() -> {
- barrier.await();
- return transactionResponse3;
+ withAnswer(new Callable<SagaResponse>() {
+ @Override
+ public SagaResponse call() throws Exception {
+ barrier.await();
+ return transactionResponse3;
+ }
}));
saga.run();
@@ -305,21 +313,27 @@ public class SagaIntegrationTest {
addExtraChildToNode1();
// barrier to make sure the two transactions starts at the same time
- CyclicBarrier barrier = new CyclicBarrier(2);
+ final CyclicBarrier barrier = new CyclicBarrier(2);
when(transaction3.send(request3.serviceName(), transactionResponse1))
- .thenAnswer(withAnswer(() -> {
- barrier.await();
- throw exception;
- }));
+ .thenAnswer(withAnswer(new Callable<SagaResponse>() {
+ @Override
+ public SagaResponse call() throws Exception {
+ barrier.await();
+ throw exception;
+ }
+ }));
- CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch latch = new CountDownLatch(1);
when(transaction2.send(request2.serviceName(), transactionResponse1))
- .thenAnswer(withAnswer(() -> {
- barrier.await();
- latch.await();
- return transactionResponse2;
- })).thenReturn(transactionResponse2);
+ .thenAnswer(withAnswer(new Callable<SagaResponse>() {
+ @Override
+ public SagaResponse call() throws Exception {
+ barrier.await();
+ latch.await();
+ return transactionResponse2;
+ }
+ })).thenReturn(transactionResponse2);
saga.run();
@@ -646,8 +660,13 @@ public class SagaIntegrationTest {
verify(compensation2, never()).send(request2.serviceName());
}
- private Answer<SagaResponse> withAnswer(Callable<SagaResponse> callable) {
- return invocationOnMock -> callable.call();
+ private Answer<SagaResponse> withAnswer(final Callable<SagaResponse>
callable) {
+ return new Answer<SagaResponse>() {
+ @Override
+ public SagaResponse answer(InvocationOnMock invocation) throws Throwable
{
+ return callable.call();
+ }
+ };
}
private EventEnvelope envelope(SagaEvent event) {
diff --git
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/DirectedAcyclicGraphTraversalTest.java
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/DirectedAcyclicGraphTraversalTest.java
index beb9607..3c54fcf 100644
---
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/DirectedAcyclicGraphTraversalTest.java
+++
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/DirectedAcyclicGraphTraversalTest.java
@@ -58,7 +58,7 @@ public class DirectedAcyclicGraphTraversalTest {
@Test
public void traverseGraphOneLevelPerStepFromRoot() {
- Traveller<String> traveller = new ByLevelTraveller<>(dag, new
FromRootTraversalDirection<>());
+ Traveller<String> traveller = new ByLevelTraveller<>(dag, new
FromRootTraversalDirection<String>());
Collection<Node<String>> nodes = traveller.nodes();
@@ -80,7 +80,7 @@ public class DirectedAcyclicGraphTraversalTest {
@Test
public void traverseGraphOneLevelPerStepFromLeaf() {
- Traveller<String> traveller = new ByLevelTraveller<>(dag, new
FromLeafTraversalDirection<>());
+ Traveller<String> traveller = new ByLevelTraveller<>(dag, new
FromLeafTraversalDirection<String>());
Collection<Node<String>> nodes = traveller.nodes();
diff --git
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphBuilderTest.java
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphBuilderTest.java
index 8703c7a..9a6c0f8 100644
---
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphBuilderTest.java
+++
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphBuilderTest.java
@@ -19,8 +19,6 @@ package org.apache.servicecomb.saga.core.dag;
import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
import static org.apache.servicecomb.saga.core.Operation.TYPE_REST;
-import static java.util.Collections.emptyMap;
-import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
@@ -31,8 +29,12 @@ import static org.mockito.Matchers.any;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.stream.Collectors;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.servicecomb.saga.core.NoOpSagaRequest;
import org.apache.servicecomb.saga.core.SagaException;
@@ -49,28 +51,30 @@ import org.apache.servicecomb.saga.core.TransactionImpl;
@SuppressWarnings("unchecked")
public class GraphBuilderTest {
+ public static final Map<String, Map<String, String>> EMPTY_MAP =
Collections.<String, Map<String, String>>emptyMap();
+
private final SagaRequest request1 = new SagaRequestImpl(
"request-aaa",
"aaa",
TYPE_REST,
- new TransactionImpl("/rest/as", "post", emptyMap()),
- new CompensationImpl("/rest/as","delete", emptyMap())
+ new TransactionImpl("/rest/as", "post", EMPTY_MAP),
+ new CompensationImpl("/rest/as","delete", EMPTY_MAP)
);
private final SagaRequest request2 = new SagaRequestImpl(
"request-bbb",
"bbb",
TYPE_REST,
- new TransactionImpl("/rest/bs", "post", emptyMap()),
- new CompensationImpl("/rest/bs","delete", emptyMap())
+ new TransactionImpl("/rest/bs", "post", EMPTY_MAP),
+ new CompensationImpl("/rest/bs","delete", EMPTY_MAP)
);
private final SagaRequest request3 = new SagaRequestImpl(
"request-ccc",
"ccc",
TYPE_REST,
- new TransactionImpl("/rest/cs", "post", emptyMap()),
- new CompensationImpl("/rest/cs","delete", emptyMap()),
+ new TransactionImpl("/rest/cs", "post", EMPTY_MAP),
+ new CompensationImpl("/rest/cs","delete", EMPTY_MAP),
null,
new String[]{"request-aaa", "request-bbb"}
);
@@ -80,8 +84,8 @@ public class GraphBuilderTest {
"request-duplicate-id",
"xxx",
TYPE_REST,
- new TransactionImpl("/rest/xs", "post", emptyMap()),
- new CompensationImpl("/rest/xs","delete", emptyMap())
+ new TransactionImpl("/rest/xs", "post", EMPTY_MAP),
+ new CompensationImpl("/rest/xs","delete", EMPTY_MAP)
);
private final SagaRequest[] duplicateRequests = {duplicateRequest,
duplicateRequest};
@@ -90,14 +94,14 @@ public class GraphBuilderTest {
@Before
public void setUp() throws Exception {
- when(detector.cycleJoints(any())).thenReturn(emptySet());
+
when(detector.cycleJoints((SingleLeafDirectedAcyclicGraph<SagaRequest>)any())).thenReturn((Set<Node<SagaRequest>>)
Collections.EMPTY_SET);
}
@Test
public void buildsGraphOfParallelRequests() {
SingleLeafDirectedAcyclicGraph<SagaRequest> tasks =
graphBuilder.build(requests);
- Traveller<SagaRequest> traveller = new ByLevelTraveller<>(tasks, new
FromRootTraversalDirection<>());
+ Traveller<SagaRequest> traveller = new ByLevelTraveller<>(tasks, new
FromRootTraversalDirection<SagaRequest>());
Collection<Node<SagaRequest>> nodes = traveller.nodes();
traveller.next();
@@ -130,7 +134,7 @@ public class GraphBuilderTest {
@Test
public void blowsUpWhenGraphContainsCycle() {
reset(detector);
- when(detector.cycleJoints(any())).thenReturn(singleton(new Node<>(0L,
null)));
+ when(detector.cycleJoints((SingleLeafDirectedAcyclicGraph<SagaRequest>)
any())).thenReturn(singleton(new Node<SagaRequest>(0L, null)));
try {
graphBuilder.build(requests);
@@ -141,8 +145,10 @@ public class GraphBuilderTest {
}
private Collection<SagaRequest> requestsOf(Collection<Node<SagaRequest>>
nodes) {
- return nodes.stream()
- .map(Node::value)
- .collect(Collectors.toList());
+ List<SagaRequest> result = new ArrayList<>();
+ for(Node<SagaRequest> node: nodes) {
+ result.add(node.value());
+ }
+ return result;
}
}
diff --git
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorTest.java
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorTest.java
index 934c54b..190beeb 100644
---
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorTest.java
+++
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorTest.java
@@ -52,7 +52,6 @@ public class GraphCycleDetectorTest {
node1.addChild(node3);
Set<Node<String>> nodes = detector.cycleJoints(graph);
-
assertThat(nodes.isEmpty(), is(true));
}
@@ -63,7 +62,7 @@ public class GraphCycleDetectorTest {
node3.addChild(node1);
Set<Node<String>> nodes = detector.cycleJoints(graph);
-
+
assertThat(nodes, contains(node1));
}
diff --git
a/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonFallback.java
b/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonFallback.java
index 443ba66..a70aafb 100644
---
a/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonFallback.java
+++
b/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonFallback.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.format;
import org.apache.servicecomb.saga.core.Fallback;
import org.apache.servicecomb.saga.core.Operation;
+import org.apache.servicecomb.saga.core.SagaResponse;
import org.apache.servicecomb.saga.transports.TransportFactory;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -60,5 +61,15 @@ public interface JacksonFallback extends Fallback,
TransportAware {
public Operation with(TransportFactory transport) {
return this;
}
+
+ @Override
+ public SagaResponse send(String address) {
+ return SUCCESSFUL_SAGA_RESPONSE;
+ }
+
+ @Override
+ public SagaResponse send(String address, SagaResponse response) {
+ return send(address);
+ }
}
}