This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e586b5  SCB-853 Saga-core supports JDK7 now
7e586b5 is described below

commit 7e586b569c61b6faafe1d376922a49522fcba848
Author: Willem Jiang <jiangni...@huawei.com>
AuthorDate: Mon Aug 20 09:30:14 2018 +0800

    SCB-853 Saga-core supports JDK7 now
---
 saga-core/pom.xml                                  | 10 ++++
 .../servicecomb/saga/core/BackwardRecovery.java    |  5 ++
 .../apache/servicecomb/saga/core/Compensation.java | 32 +++++++++--
 .../saga/core/CompositeSagaResponse.java           | 31 ++++++++---
 .../apache/servicecomb/saga/core/Descriptive.java  |  4 +-
 .../org/apache/servicecomb/saga/core/Fallback.java | 17 +++++-
 .../servicecomb/saga/core/ForwardRecovery.java     |  5 ++
 .../servicecomb/saga/core/GraphBasedSaga.java      | 15 +++---
 .../saga/core/LoggingRecoveryPolicy.java           |  5 ++
 .../servicecomb/saga/core/NoOpSagaRequest.java     |  6 +++
 .../apache/servicecomb/saga/core/Operation.java    |  8 +--
 .../servicecomb/saga/core/RestOperation.java       | 14 +++--
 .../apache/servicecomb/saga/core/SagaContext.java  |  4 +-
 .../servicecomb/saga/core/SagaContextImpl.java     | 35 +++++++-----
 .../apache/servicecomb/saga/core/SagaEvent.java    |  5 ++
 .../apache/servicecomb/saga/core/SagaRequest.java  |  6 +--
 .../servicecomb/saga/core/SagaTaskFactory.java     | 10 ++--
 .../apache/servicecomb/saga/core/Transaction.java  | 18 +++++++
 .../servicecomb/saga/core/TransactionConsumer.java |  5 ++
 .../saga/core/dag/ByLevelTraveller.java            |  5 +-
 .../saga/core/dag/GraphBasedSagaFactory.java       |  3 +-
 .../servicecomb/saga/core/dag/GraphBuilder.java    |  9 ++--
 .../saga/core/dag/GraphCycleDetectorImpl.java      | 26 +++++----
 .../org/apache/servicecomb/saga/core/dag/Node.java |  4 +-
 .../servicecomb/saga/core/CompensationImpl.java    |  5 ++
 .../saga/core/CompositeSagaResponseTest.java       |  8 +++
 .../servicecomb/saga/core/ForwardRecoveryTest.java |  7 ++-
 .../servicecomb/saga/core/RestOperationTest.java   | 18 ++++---
 .../servicecomb/saga/core/RetrySagaLogTest.java    | 19 +++++--
 .../saga/core/SagaExecutionComponentTestBase.java  | 44 +++++++++++----
 .../servicecomb/saga/core/SagaIntegrationTest.java | 63 ++++++++++++++--------
 .../dag/DirectedAcyclicGraphTraversalTest.java     |  4 +-
 .../saga/core/dag/GraphBuilderTest.java            | 40 ++++++++------
 .../saga/core/dag/GraphCycleDetectorTest.java      |  3 +-
 .../servicecomb/saga/format/JacksonFallback.java   | 11 ++++
 35 files changed, 366 insertions(+), 138 deletions(-)

diff --git a/saga-core/pom.xml b/saga-core/pom.xml
index fd1b47c..e460b1a 100644
--- a/saga-core/pom.xml
+++ b/saga-core/pom.xml
@@ -95,6 +95,16 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.1</version>
+        <configuration>
+          <encoding>UTF-8</encoding>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
     </plugins>
   </build>
 
diff --git 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/BackwardRecovery.java
 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/BackwardRecovery.java
index e46de2f..799dbbf 100644
--- 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/BackwardRecovery.java
+++ 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/BackwardRecovery.java
@@ -46,4 +46,9 @@ public class BackwardRecovery implements RecoveryPolicy {
       throw e;
     }
   }
+
+  @Override
+  public String description() {
+    return getClass().getSimpleName();
+  }
 }
diff --git 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Compensation.java 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Compensation.java
index f642011..a6a352b 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Compensation.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Compensation.java
@@ -20,14 +20,40 @@ package org.apache.servicecomb.saga.core;
 public interface Compensation extends Operation {
 
   Compensation SAGA_START_COMPENSATION = new Compensation() {
+    @Override
+    public SagaResponse send(String address) {
+      return SUCCESSFUL_SAGA_RESPONSE;
+    }
+
+    @Override
+    public SagaResponse send(String address, SagaResponse response) {
+      return send(address);
+    }
+
+    @Override
+    public int retries() {
+      return DEFAULT_RETRIES;
+    }
   };
 
   Compensation SAGA_END_COMPENSATION = new Compensation() {
+    @Override
+    public SagaResponse send(String address) {
+      return SUCCESSFUL_SAGA_RESPONSE;
+    }
+
+    @Override
+    public SagaResponse send(String address, SagaResponse response) {
+      return send(address);
+    }
+
+    @Override
+    public int retries() {
+      return DEFAULT_RETRIES;
+    }
   };
 
   int DEFAULT_RETRIES = 3;
 
-  default int retries() {
-    return DEFAULT_RETRIES;
-  }
+  int retries();
 }
diff --git 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/CompositeSagaResponse.java
 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/CompositeSagaResponse.java
index 02ab48c..84753b9 100644
--- 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/CompositeSagaResponse.java
+++ 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/CompositeSagaResponse.java
@@ -18,7 +18,6 @@
 package org.apache.servicecomb.saga.core;
 
 import java.util.Collection;
-import java.util.Optional;
 
 public class CompositeSagaResponse implements SagaResponse {
   private final Collection<SagaResponse> responses;
@@ -29,19 +28,35 @@ public class CompositeSagaResponse implements SagaResponse {
 
   @Override
   public boolean succeeded() {
-    return responses.stream().allMatch(SagaResponse::succeeded);
+    if (responses.size() > 0) {
+      boolean result = true;
+      for (SagaResponse response : responses) {
+        result = result && response.succeeded();
+      }
+      return result;
+    } else {
+      return false;
+    }
   }
 
   @Override
   public String body() {
-    Optional<String> reduce = responses.stream()
-        .map(SagaResponse::body)
-        .reduce((a, b) -> a + ", " + b)
-        .map(combined -> "[" + combined + "]");
-
-    return reduce.orElse("{}");
+    StringBuffer result = new StringBuffer();
+    if (responses.size() == 0) {
+      result.append("{}");
+    } else {
+      result.append("[");
+      for (SagaResponse response : responses) {
+        result.append(response.body());
+        result.append(", ");
+      }
+      result.delete(result.length()-2, result.length());
+      result.append("]");
+    }
+    return result.toString();
   }
 
+
   public Collection<SagaResponse> responses() {
     return responses;
   }
diff --git 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Descriptive.java 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Descriptive.java
index bd367af..e8b4f4e 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Descriptive.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Descriptive.java
@@ -19,7 +19,5 @@ package org.apache.servicecomb.saga.core;
 
 interface Descriptive {
 
-  default String description() {
-    return getClass().getSimpleName();
-  }
+  String description();
 }
diff --git 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Fallback.java 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Fallback.java
index fca68a0..8c1068b 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Fallback.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Fallback.java
@@ -19,7 +19,22 @@ package org.apache.servicecomb.saga.core;
 
 public interface Fallback extends Operation {
 
-  Fallback NOP_FALLBACK = () -> TYPE_NOP;
+  Fallback NOP_FALLBACK = new Fallback() {
+    @Override
+    public String type() {
+      return TYPE_NOP;
+    }
+
+    @Override
+    public SagaResponse send(String address) {
+      return SUCCESSFUL_SAGA_RESPONSE;
+    }
+
+    @Override
+    public SagaResponse send(String address, SagaResponse response) {
+      return send(address);
+    }
+  };
 
   String type();
 }
diff --git 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/ForwardRecovery.java 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/ForwardRecovery.java
index c783990..75e16ee 100644
--- 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/ForwardRecovery.java
+++ 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/ForwardRecovery.java
@@ -50,4 +50,9 @@ public class ForwardRecovery implements RecoveryPolicy {
       throw new TransactionFailedException(ignored);
     }
   }
+
+  @Override
+  public String description() {
+    return getClass().getSimpleName();
+  }
 }
diff --git 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/GraphBasedSaga.java 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/GraphBasedSaga.java
index f4544e3..69347fc 100644
--- 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/GraphBasedSaga.java
+++ 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/GraphBasedSaga.java
@@ -65,15 +65,15 @@ public class GraphBasedSaga implements Saga {
     this.tasks = tasks;
 
     this.transactionTaskRunner = new TaskRunner(
-        traveller(sagaTaskGraph, new FromRootTraversalDirection<>()),
+        traveller(sagaTaskGraph, new 
FromRootTraversalDirection<SagaRequest>()),
         new TransactionTaskConsumer(
             tasks,
             sagaContext,
-            new ExecutorCompletionService<>(executor)));
+            new ExecutorCompletionService<Operation>(executor)));
 
     this.sagaContext = sagaContext;
     this.compensationTaskRunner = new TaskRunner(
-        traveller(sagaTaskGraph, new FromLeafTraversalDirection<>()),
+        traveller(sagaTaskGraph, new 
FromLeafTraversalDirection<SagaRequest>()),
         new CompensationTaskConsumer(tasks, sagaContext));
 
     currentTaskRunner = transactionTaskRunner;
@@ -92,9 +92,12 @@ public class GraphBasedSaga implements Saga {
         log.error("Failed to run operation", e);
         currentTaskRunner = compensationTaskRunner;
 
-        sagaContext.handleHangingTransactions(request -> {
-          tasks.get(request.task()).commit(request, 
sagaContext.responseOf(request.parents()));
-          tasks.get(request.task()).compensate(request);
+        sagaContext.handleHangingTransactions(new 
TransactionConsumer<SagaRequest>() {
+          @Override
+          public void accept(SagaRequest request) {
+            tasks.get(request.task()).commit(request, 
sagaContext.responseOf(request.parents()));
+            tasks.get(request.task()).compensate(request);
+          }
         });
       }
     } while (currentTaskRunner.hasNext());
diff --git 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/LoggingRecoveryPolicy.java
 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/LoggingRecoveryPolicy.java
index 9173eca..593aa91 100644
--- 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/LoggingRecoveryPolicy.java
+++ 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/LoggingRecoveryPolicy.java
@@ -43,4 +43,9 @@ public class LoggingRecoveryPolicy implements RecoveryPolicy {
     log.info("Completed request id={} for service {}", request.id(), 
request.serviceName());
     return response;
   }
+
+  @Override
+  public String description() {
+    return getClass().getSimpleName();
+  }
 }
diff --git 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/NoOpSagaRequest.java 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/NoOpSagaRequest.java
index 56ad05e..93bbda9 100644
--- 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/NoOpSagaRequest.java
+++ 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/NoOpSagaRequest.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.core;
 
 import static 
org.apache.servicecomb.saga.core.Compensation.SAGA_END_COMPENSATION;
 import static 
org.apache.servicecomb.saga.core.Compensation.SAGA_START_COMPENSATION;
+import static org.apache.servicecomb.saga.core.Fallback.NOP_FALLBACK;
 import static org.apache.servicecomb.saga.core.Operation.TYPE_NOP;
 import static org.apache.servicecomb.saga.core.SagaTask.SAGA_END_TASK;
 import static org.apache.servicecomb.saga.core.SagaTask.SAGA_START_TASK;
@@ -65,6 +66,11 @@ public class NoOpSagaRequest implements SagaRequest {
   }
 
   @Override
+  public Fallback fallback() {
+    return NOP_FALLBACK;
+  }
+
+  @Override
   public String serviceName() {
     return "Saga";
   }
diff --git 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Operation.java 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Operation.java
index 13b6f8a..99e760b 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Operation.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Operation.java
@@ -23,11 +23,7 @@ public interface Operation {
   String TYPE_REST = "rest";
   SagaResponse SUCCESSFUL_SAGA_RESPONSE = new 
SuccessfulSagaResponse("success");
 
-  default SagaResponse send(String address) {
-    return SUCCESSFUL_SAGA_RESPONSE;
-  }
+  SagaResponse send(String address);
 
-  default SagaResponse send(String address, SagaResponse response) {
-    return send(address);
-  }
+  SagaResponse send(String address, SagaResponse response);
 }
diff --git 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/RestOperation.java 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/RestOperation.java
index 5cce22d..153eb98 100644
--- 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/RestOperation.java
+++ 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/RestOperation.java
@@ -17,8 +17,6 @@
 
 package org.apache.servicecomb.saga.core;
 
-import static java.util.Collections.emptyMap;
-
 import 
org.apache.servicecomb.saga.core.application.interpreter.RestRequestChecker;
 import java.util.Map;
 
@@ -33,7 +31,7 @@ public class RestOperation implements Operation {
 
     this.path = path;
     this.method = method;
-    this.params = params == null? emptyMap() : params;
+    this.params = params == null? java.util.Collections.<String, Map<String, 
String>>emptyMap() : params;
   }
 
   public String path() {
@@ -56,4 +54,14 @@ public class RestOperation implements Operation {
         ", params=" + params +
         '}';
   }
+
+  @Override
+  public SagaResponse send(String address) {
+    return SUCCESSFUL_SAGA_RESPONSE;
+  }
+
+  @Override
+  public SagaResponse send(String address, SagaResponse response) {
+    return send(address);
+  }
 }
diff --git 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContext.java 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContext.java
index 309be44..2f76031 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContext.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContext.java
@@ -17,8 +17,6 @@
 
 package org.apache.servicecomb.saga.core;
 
-import java.util.function.Consumer;
-
 public interface SagaContext extends EventContext {
   boolean isCompensationStarted();
 
@@ -26,7 +24,7 @@ public interface SagaContext extends EventContext {
 
   boolean isCompensationCompleted(SagaRequest request);
 
-  void handleHangingTransactions(Consumer<SagaRequest> consumer);
+  void handleHangingTransactions(TransactionConsumer<SagaRequest> consumer);
 
   SagaResponse responseOf(String requestId);
 
diff --git 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContextImpl.java 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContextImpl.java
index 610ae27..bc4253c 100644
--- 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContextImpl.java
+++ 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContextImpl.java
@@ -17,16 +17,16 @@
 
 package org.apache.servicecomb.saga.core;
 
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
+
 
 import org.apache.servicecomb.saga.core.application.interpreter.FromJsonFormat;
 
@@ -86,7 +86,7 @@ public class SagaContextImpl implements SagaContext {
   }
 
   @Override
-  public void handleHangingTransactions(Consumer<SagaRequest> consumer)  {
+  public void handleHangingTransactions(TransactionConsumer<SagaRequest> 
consumer)  {
     for (Iterator<SagaRequest> iterator = 
hangingTransactions.values().iterator(); iterator.hasNext(); ) {
       consumer.accept(iterator.next());
     }
@@ -94,13 +94,19 @@ public class SagaContextImpl implements SagaContext {
 
   @Override
   public SagaResponse responseOf(String requestId) {
-    return completedTransactions.getOrDefault(requestId, 
SagaResponse.NONE_RESPONSE);
+    SagaResponse response = completedTransactions.get(requestId);
+    if (response == null) {
+      response = SagaResponse.NONE_RESPONSE;
+    }
+    return response;
   }
 
   private List<SagaResponse> responsesOf(String[] parentRequestIds) {
-    return Arrays.stream(parentRequestIds)
-        .map(this::responseOf)
-        .collect(Collectors.toList());
+    List<SagaResponse> result = new ArrayList<>();
+    for(String parentRequestId: parentRequestIds) {
+      result.add(responseOf(parentRequestId));
+    }
+    return result;
   }
 
   @Override
@@ -125,9 +131,14 @@ public class SagaContextImpl implements SagaContext {
   }
 
   private Set<String> chosenChildrenOf(String[] parentRequestIds) {
-    return Arrays.stream(parentRequestIds)
-        .map(this::responseOf)
-        .flatMap(sagaResponse -> 
childrenExtractor.fromJson(sagaResponse.body()).stream())
-        .collect(Collectors.toSet());
+    Set<String> result = new HashSet<>();
+    for(String parentRequestId: parentRequestIds) {
+      SagaResponse response = responseOf(parentRequestId);
+      Set<String> jsons = childrenExtractor.fromJson(response.body());
+      for (String json : jsons) {
+        result.add(json);
+      }
+    }
+    return result;
   }
 }
diff --git 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaEvent.java 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaEvent.java
index ceb0ebe..ffe31d0 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaEvent.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaEvent.java
@@ -36,4 +36,9 @@ public abstract class SagaEvent implements Descriptive {
   public String json(ToJsonFormat toJsonFormat) {
     return "{}";
   }
+
+  @Override
+  public String description() {
+    return getClass().getSimpleName();
+  }
 }
diff --git 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaRequest.java 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaRequest.java
index 337abdd..4578e42 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaRequest.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaRequest.java
@@ -17,8 +17,6 @@
 
 package org.apache.servicecomb.saga.core;
 
-import static org.apache.servicecomb.saga.core.Fallback.NOP_FALLBACK;
-
 public interface SagaRequest {
 
   String PARAM_FORM = "form";
@@ -30,9 +28,7 @@ public interface SagaRequest {
 
   Compensation compensation();
 
-  default Fallback fallback() {
-    return NOP_FALLBACK;
-  }
+  Fallback fallback();
 
   String serviceName();
 
diff --git 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaTaskFactory.java 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaTaskFactory.java
index 0fd2450..b6d9585 100644
--- 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaTaskFactory.java
+++ 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaTaskFactory.java
@@ -36,12 +36,12 @@ public class SagaTaskFactory {
     retrySagaLog = new RetrySagaLog(persistentStore, retryDelay);
   }
 
-  public Map<String, SagaTask> sagaTasks(String sagaId,
-      String requestJson,
-      RecoveryPolicy recoveryPolicy,
-      EventStore sagaLog) {
+  public Map<String, SagaTask> sagaTasks(final String sagaId,
+      final String requestJson,
+      final RecoveryPolicy recoveryPolicy,
+      final EventStore sagaLog) {
 
-    SagaLog compositeSagaLog = compositeSagaLog(sagaLog, persistentStore);
+    final SagaLog compositeSagaLog = compositeSagaLog(sagaLog, 
persistentStore);
 
     return new HashMap<String, SagaTask>() {{
       put(SagaTask.SAGA_START_TASK, new SagaStartTask(sagaId, requestJson, 
compositeSagaLog));
diff --git 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Transaction.java 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Transaction.java
index fafb845..389f8fa 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Transaction.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Transaction.java
@@ -20,8 +20,26 @@ package org.apache.servicecomb.saga.core;
 public interface Transaction extends Operation {
 
   Transaction SAGA_START_TRANSACTION = new Transaction() {
+    @Override
+    public SagaResponse send(String address) {
+      return SUCCESSFUL_SAGA_RESPONSE;
+    }
+
+    @Override
+    public SagaResponse send(String address, SagaResponse response) {
+      return send(address);
+    }
   };
 
   Transaction SAGA_END_TRANSACTION = new Transaction() {
+    @Override
+    public SagaResponse send(String address) {
+      return SUCCESSFUL_SAGA_RESPONSE;
+    }
+
+    @Override
+    public SagaResponse send(String address, SagaResponse response) {
+      return send(address);
+    }
   };
 }
diff --git 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/TransactionConsumer.java
 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/TransactionConsumer.java
new file mode 100644
index 0000000..b5bb098
--- /dev/null
+++ 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/TransactionConsumer.java
@@ -0,0 +1,5 @@
+package org.apache.servicecomb.saga.core;
+
+public interface TransactionConsumer<T> {
+  void accept(T request);
+}
diff --git 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/ByLevelTraveller.java
 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/ByLevelTraveller.java
index 22bc874..2489117 100644
--- 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/ByLevelTraveller.java
+++ 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/ByLevelTraveller.java
@@ -59,7 +59,10 @@ public class ByLevelTraveller<T> implements Traveller<T> {
       nodes.add(node);
 
       for (Node<T> child : traversalDirection.children(node)) {
-        nodeParents.computeIfAbsent(child.id(), id -> new 
HashSet<>(traversalDirection.parents(child)));
+        // This is not thread safe
+        if (nodeParents.get(child.id()) == null) {
+          nodeParents.put(child.id(), new 
HashSet<>(traversalDirection.parents(child)));
+        }
         nodeParents.get(child.id()).remove(node);
 
         if (nodeParents.get(child.id()).isEmpty()) {
diff --git 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBasedSagaFactory.java
 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBasedSagaFactory.java
index 2048383..55c71c1 100644
--- 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBasedSagaFactory.java
+++ 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBasedSagaFactory.java
@@ -27,6 +27,7 @@ import org.apache.servicecomb.saga.core.GraphBasedSaga;
 import org.apache.servicecomb.saga.core.Saga;
 import org.apache.servicecomb.saga.core.SagaContext;
 import org.apache.servicecomb.saga.core.SagaContextImpl;
+import org.apache.servicecomb.saga.core.SagaRequest;
 import org.apache.servicecomb.saga.core.application.SagaFactory;
 import org.apache.servicecomb.saga.infrastructure.ContextAwareEventStore;
 import org.apache.servicecomb.saga.core.PersistentStore;
@@ -49,7 +50,7 @@ public class GraphBasedSagaFactory implements SagaFactory {
     this.childrenExtractor = childrenExtractor;
     this.executorService = executorService;
     this.sagaTaskFactory = new SagaTaskFactory(retryDelay, persistentStore);
-    this.graphBuilder = new GraphBuilder(new GraphCycleDetectorImpl<>());
+    this.graphBuilder = new GraphBuilder(new 
GraphCycleDetectorImpl<SagaRequest>());
   }
 
   @Override
diff --git 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBuilder.java
 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBuilder.java
index 578cbb9..c5cd850 100644
--- 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBuilder.java
+++ 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBuilder.java
@@ -63,10 +63,11 @@ public class GraphBuilder {
       }
     }
 
-    requestNodes.values().stream()
-        .filter((node) -> node.children().isEmpty())
-        .forEach(node -> node.addChild(leaf));
-
+    for(Node<SagaRequest> node : requestNodes.values()) {
+      if (node.children().isEmpty()) {
+        node.addChild(leaf);
+      }
+    }
     return new SingleLeafDirectedAcyclicGraph<>(root, leaf);
   }
 
diff --git 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorImpl.java
 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorImpl.java
index 6011a98..f2869b3 100644
--- 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorImpl.java
+++ 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorImpl.java
@@ -45,26 +45,32 @@ public class GraphCycleDetectorImpl<T> implements 
GraphCycleDetector<T> {
     return unvisitedNodes(nodeParents);
   }
 
+  // This method is not thread safe
   private void traverse(Queue<Node<T>> orphanNodes, Map<Node<T>, Set<Node<T>>> 
nodeParents) {
     while (!orphanNodes.isEmpty()) {
       Node<T> node = orphanNodes.poll();
 
-      node.children().forEach(child -> {
-        nodeParents.computeIfAbsent(child, n -> new HashSet<>(child.parents()))
-            .remove(node);
-
+      for(Node<T> child : node.children()) {
+        Set<Node<T>> parent = nodeParents.get(child);
+        if (parent == null) {
+          parent = new HashSet<>(child.parents());
+          nodeParents.put(child, parent);
+        }
+        parent.remove(node);
         if (nodeParents.get(child).isEmpty()) {
           orphanNodes.add(child);
         }
-      });
+      }
     }
   }
 
   private Set<Node<T>> unvisitedNodes(Map<Node<T>, Set<Node<T>>> nodeParents) {
-    return nodeParents.entrySet()
-        .parallelStream()
-        .filter(parents -> !parents.getValue().isEmpty())
-        .map(Entry::getKey)
-        .collect(Collectors.toSet());
+    Set<Node<T>> result = new HashSet<>();
+    for (Map.Entry<Node<T>, Set<Node<T>>> entry : nodeParents.entrySet()) {
+      if (!entry.getValue().isEmpty()) {
+        result.add(entry.getKey());
+      }
+    }
+    return result;
   }
 }
diff --git 
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/Node.java 
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/Node.java
index 0b3be0a..3e5bcad 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/Node.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/Node.java
@@ -56,7 +56,9 @@ public class Node<T> {
 
   public void addChildren(Collection<Node<T>> nodes) {
     children.addAll(nodes);
-    nodes.forEach(node -> node.parents.add(this));
+    for (Node<T> node : nodes) {
+      node.parents.add(this);
+    }
   }
 
   @Override
diff --git 
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompensationImpl.java
 
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompensationImpl.java
index c974bc4..0077802 100644
--- 
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompensationImpl.java
+++ 
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompensationImpl.java
@@ -24,4 +24,9 @@ public class CompensationImpl extends RestOperation 
implements Compensation {
   public CompensationImpl(String path, String method, Map<String, Map<String, 
String>> params) {
     super(path, method, params);
   }
+
+  @Override
+  public int retries() {
+    return DEFAULT_RETRIES;
+  }
 }
diff --git 
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompositeSagaResponseTest.java
 
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompositeSagaResponseTest.java
index be4b9f4..449340c 100644
--- 
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompositeSagaResponseTest.java
+++ 
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompositeSagaResponseTest.java
@@ -22,6 +22,8 @@ import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
+
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -31,6 +33,7 @@ public class CompositeSagaResponseTest {
   private final SagaResponse response2 = Mockito.mock(SagaResponse.class);
 
   private final SagaResponse compositeSagaResponse = new 
CompositeSagaResponse(asList(response1, response2));
+  private final SagaResponse emptySagaResponse = new CompositeSagaResponse(new 
ArrayList<SagaResponse>());
 
   @Test
   public void succeededOnlyWhenAllAreSuccessful() throws Exception {
@@ -70,4 +73,9 @@ public class CompositeSagaResponseTest {
         + "}\n"
         + "]"));
   }
+
+  @Test
+  public void EmptyCompositeSagaResponse() {
+    assertThat(emptySagaResponse.body(), is("{}"));
+  }
 }
diff --git 
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/ForwardRecoveryTest.java
 
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/ForwardRecoveryTest.java
index a6e1dca..078f996 100644
--- 
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/ForwardRecoveryTest.java
+++ 
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/ForwardRecoveryTest.java
@@ -49,7 +49,12 @@ public class ForwardRecoveryTest {
   public void blowsUpWhenTaskIsNotCommittedWithFailRetryDelaySeconds() throws 
Exception {
     doThrow(Exception.class).when(transaction).send(serviceName, 
parentResponse);
 
-    Thread t = new Thread(() -> forwardRecovery.apply(sagaTask, sagaRequest, 
parentResponse));
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        forwardRecovery.apply(sagaTask, sagaRequest, parentResponse);
+      }
+    });
     t.start();
     Thread.sleep(400);
     t.interrupt();
diff --git 
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/RestOperationTest.java
 
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/RestOperationTest.java
index 7f1990d..900bb8b 100644
--- 
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/RestOperationTest.java
+++ 
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/RestOperationTest.java
@@ -23,14 +23,20 @@ import static java.util.Collections.singletonMap;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
+import java.util.Collections;
+import java.util.Map;
+
 import org.junit.Test;
 
 public class RestOperationTest {
 
+  public static final Map<String, Map<String, String>> EMPTY_MAP_MAP = 
Collections.<String, Map<String, String>>emptyMap();
+  public static final Map<String, String> EMPTY_MAP =  Collections.<String, 
String>emptyMap();
+
   @Test
   public void blowsUpWhenGetMethodWithForm() {
     try {
-      new RestOperation("blah", "GET", singletonMap("form", emptyMap()));
+      new RestOperation("blah", "GET", singletonMap("form", EMPTY_MAP));
       expectFailing(IllegalArgumentException.class);
     } catch (IllegalArgumentException e) {
       assertThat(e.getMessage(), is("GET & DELETE request cannot enclose a 
body"));
@@ -40,7 +46,7 @@ public class RestOperationTest {
   @Test
   public void blowsUpWhenGetMethodWithJson() {
     try {
-      new RestOperation("blah", "GET", singletonMap("json", emptyMap()));
+      new RestOperation("blah", "GET", singletonMap("json", EMPTY_MAP));
       expectFailing(IllegalArgumentException.class);
     } catch (IllegalArgumentException e) {
       assertThat(e.getMessage(), is("GET & DELETE request cannot enclose a 
body"));
@@ -50,7 +56,7 @@ public class RestOperationTest {
   @Test
   public void blowsUpWhenDeleteMethodWithForm() {
     try {
-      new RestOperation("blah", "DELETE", singletonMap("form", emptyMap()));
+      new RestOperation("blah", "DELETE", singletonMap("form", EMPTY_MAP));
       expectFailing(IllegalArgumentException.class);
     } catch (IllegalArgumentException e) {
       assertThat(e.getMessage(), is("GET & DELETE request cannot enclose a 
body"));
@@ -60,7 +66,7 @@ public class RestOperationTest {
   @Test
   public void blowsUpWhenDeleteMethodWithJson() {
     try {
-      new RestOperation("blah", "DELETE", singletonMap("json", emptyMap()));
+      new RestOperation("blah", "DELETE", singletonMap("json", EMPTY_MAP));
       expectFailing(IllegalArgumentException.class);
     } catch (IllegalArgumentException e) {
       assertThat(e.getMessage(), is("GET & DELETE request cannot enclose a 
body"));
@@ -70,7 +76,7 @@ public class RestOperationTest {
   @Test
   public void blowsUpWhenMethodIsNotSupported() {
     try {
-      new RestOperation("blah", "foo", emptyMap());
+      new RestOperation("blah", "foo", EMPTY_MAP_MAP );
       expectFailing(IllegalArgumentException.class);
     } catch (IllegalArgumentException e) {
       assertThat(e.getMessage(), is("Unsupported method foo"));
@@ -80,7 +86,7 @@ public class RestOperationTest {
   @Test
   public void blowsUpWhenMethodIsNull() {
     try {
-      new RestOperation("blah", null, emptyMap());
+      new RestOperation("blah", null, EMPTY_MAP_MAP);
       expectFailing(IllegalArgumentException.class);
     } catch (IllegalArgumentException e) {
       assertThat(e.getMessage(), is("Unsupported method null"));
diff --git 
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/RetrySagaLogTest.java
 
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/RetrySagaLogTest.java
index 6da8f16..9f9bdc8 100644
--- 
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/RetrySagaLogTest.java
+++ 
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/RetrySagaLogTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -63,18 +64,26 @@ public class RetrySagaLogTest {
   public void exitOnInterruption() throws InterruptedException {
     ExecutorService executor = Executors.newSingleThreadExecutor();
 
-    Future<?> future = executor.submit(() -> {
-      doThrow(RuntimeException.class).when(persistentStore).offer(dummyEvent);
+    Future<?> future = executor.submit(new Runnable() {
+      @Override
+      public void run() {
+        
doThrow(RuntimeException.class).when(persistentStore).offer(dummyEvent);
 
-      retrySagaLog.offer(dummyEvent);
-      interrupted = true;
+        retrySagaLog.offer(dummyEvent);
+        interrupted = true;
+      }
     });
 
     Thread.sleep(500);
 
     assertThat(future.cancel(true), is(true));
 
-    await().atMost(2, TimeUnit.SECONDS).until(() -> interrupted);
+    await().atMost(2, TimeUnit.SECONDS).until(new Callable<Boolean>() {
+      @Override
+      public Boolean call() {
+        return interrupted;
+      }
+    });
     executor.shutdown();
   }
 }
diff --git 
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaExecutionComponentTestBase.java
 
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaExecutionComponentTestBase.java
index b9e893f..d4f88c4 100644
--- 
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaExecutionComponentTestBase.java
+++ 
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaExecutionComponentTestBase.java
@@ -18,7 +18,6 @@
 package org.apache.servicecomb.saga.core;
 
 import static org.apache.servicecomb.saga.core.Operation.TYPE_REST;
-import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -30,9 +29,12 @@ import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.servicecomb.saga.core.application.SagaFactory;
 import org.hamcrest.Description;
@@ -48,6 +50,7 @@ import 
org.apache.servicecomb.saga.infrastructure.EmbeddedEventStore;
 
 @SuppressWarnings("unchecked")
 public abstract class SagaExecutionComponentTestBase {
+
   private static final String requestJson = "[\n"
       + "  {\n"
       + "    \"id\": \"request-1\",\n"
@@ -88,20 +91,22 @@ public abstract class SagaExecutionComponentTestBase {
       + "  \"requests\": " + anotherRequestJson + "\n"
       + "}";
 
+  public static final Map<String, Map<String, String>> EMPTY_MAP = 
Collections.<String, Map<String, String>>emptyMap();
+
   private final SagaRequest request1 = new SagaRequestImpl(
       "request-1",
       "aaa",
       TYPE_REST,
-      new TransactionImpl("/rest/as", "post", emptyMap()),
-      new CompensationImpl("/rest/as", "delete", emptyMap())
+      new TransactionImpl("/rest/as", "post", EMPTY_MAP),
+      new CompensationImpl("/rest/as", "delete", EMPTY_MAP)
   );
 
   private final SagaRequest request2 = new SagaRequestImpl(
       "request-2",
       "bbb",
       TYPE_REST,
-      new TransactionImpl("/rest/bs", "post", emptyMap()),
-      new CompensationImpl("/rest/bs", "delete", emptyMap())
+      new TransactionImpl("/rest/bs", "post", EMPTY_MAP),
+      new CompensationImpl("/rest/bs", "delete", EMPTY_MAP)
   );
 
   private final SagaDefinition definition1 = new SagaDefinition() {
@@ -174,10 +179,27 @@ public abstract class SagaExecutionComponentTestBase {
 
   @Test
   public void processRequestsInParallel() {
-    CompletableFuture.runAsync(() -> coordinator.run(sagaJson));
-    CompletableFuture.runAsync(() -> coordinator.run(anotherSagaJson));
+    ExecutorService executor = Executors.newFixedThreadPool(2);
 
-    waitAtMost(2, SECONDS).until(() -> eventStore.size() == 8);
+    executor.submit(new Runnable() {
+      @Override
+      public void run() {
+        coordinator.run(sagaJson);
+      }
+    });
+    executor.submit(new Runnable() {
+      @Override
+      public void run() {
+        coordinator.run(anotherSagaJson);
+      }
+    });
+
+    waitAtMost(2, SECONDS).until(new Callable<Boolean>() {
+      @Override
+      public Boolean call() throws Exception {
+        return eventStore.size() == 8;
+      }
+    });
 
     assertThat(eventStore, containsInAnyOrder(
         eventWith(NoOpSagaRequest.SAGA_START_REQUEST, SagaStartedEvent.class),
@@ -211,8 +233,8 @@ public abstract class SagaExecutionComponentTestBase {
   }
 
   private Matcher<SagaEvent> eventWith(
-      SagaRequest sagaRequest,
-      Class<?> type) {
+      final SagaRequest sagaRequest,
+      final Class<?> type) {
 
     return new TypeSafeMatcher<SagaEvent>() {
       @Override
diff --git 
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaIntegrationTest.java
 
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaIntegrationTest.java
index d005ee7..19f9802 100644
--- 
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaIntegrationTest.java
+++ 
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaIntegrationTest.java
@@ -44,6 +44,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -54,6 +55,7 @@ import java.util.concurrent.CyclicBarrier;
 
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import com.seanyinx.github.unit.scaffolding.Randomness;
@@ -116,7 +118,7 @@ public class SagaIntegrationTest {
   // root - node1 - node2 - leaf
   @Before
   public void setUp() throws Exception {
-    when(childrenExtractor.fromJson(anyString())).thenReturn(emptySet());
+    
when(childrenExtractor.fromJson(anyString())).thenReturn(Collections.<String>emptySet());
     
when(childrenExtractor.fromJson(NONE_RESPONSE.body())).thenReturn(setOf("none"));
 
     when(transaction1.send(request1.serviceName(), 
EMPTY_RESPONSE)).thenReturn(transactionResponse1);
@@ -169,20 +171,26 @@ public class SagaIntegrationTest {
     addExtraChildToNode1();
 
     // barrier to make sure the two transactions starts at the same time
-    CyclicBarrier barrier = new CyclicBarrier(2);
+    final CyclicBarrier barrier = new CyclicBarrier(2);
     when(transaction2.send(request2.serviceName(), transactionResponse1))
         .thenAnswer(
-            withAnswer(() -> {
-              barrier.await();
-              Thread.sleep(100);
-              throw exception;
+            withAnswer(new Callable<SagaResponse>() {
+              @Override
+              public SagaResponse call() throws Exception {
+                barrier.await();
+                Thread.sleep(100);
+                throw exception;
+              }
             }));
 
     when(transaction3.send(request3.serviceName(), transactionResponse1))
         .thenAnswer(
-            withAnswer(() -> {
-              barrier.await();
-              return transactionResponse3;
+            withAnswer(new Callable<SagaResponse>() {
+              @Override
+              public SagaResponse call() throws Exception {
+                barrier.await();
+                return transactionResponse3;
+              }
             }));
 
     saga.run();
@@ -305,21 +313,27 @@ public class SagaIntegrationTest {
     addExtraChildToNode1();
 
     // barrier to make sure the two transactions starts at the same time
-    CyclicBarrier barrier = new CyclicBarrier(2);
+    final CyclicBarrier barrier = new CyclicBarrier(2);
     when(transaction3.send(request3.serviceName(), transactionResponse1))
-        .thenAnswer(withAnswer(() -> {
-      barrier.await();
-      throw exception;
-    }));
+        .thenAnswer(withAnswer(new Callable<SagaResponse>() {
+          @Override
+          public SagaResponse call() throws Exception {
+            barrier.await();
+            throw exception;
+          }
+        }));
 
-    CountDownLatch latch = new CountDownLatch(1);
+    final CountDownLatch latch = new CountDownLatch(1);
 
     when(transaction2.send(request2.serviceName(), transactionResponse1))
-        .thenAnswer(withAnswer(() -> {
-      barrier.await();
-      latch.await();
-      return transactionResponse2;
-    })).thenReturn(transactionResponse2);
+        .thenAnswer(withAnswer(new Callable<SagaResponse>() {
+          @Override
+          public SagaResponse call() throws Exception {
+            barrier.await();
+            latch.await();
+            return transactionResponse2;
+          }
+        })).thenReturn(transactionResponse2);
 
     saga.run();
 
@@ -646,8 +660,13 @@ public class SagaIntegrationTest {
     verify(compensation2, never()).send(request2.serviceName());
   }
 
-  private Answer<SagaResponse> withAnswer(Callable<SagaResponse> callable) {
-    return invocationOnMock -> callable.call();
+  private Answer<SagaResponse> withAnswer(final Callable<SagaResponse> 
callable) {
+    return new Answer<SagaResponse>() {
+      @Override
+      public SagaResponse answer(InvocationOnMock invocation) throws Throwable 
{
+        return callable.call();
+      }
+    };
   }
 
   private EventEnvelope envelope(SagaEvent event) {
diff --git 
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/DirectedAcyclicGraphTraversalTest.java
 
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/DirectedAcyclicGraphTraversalTest.java
index beb9607..3c54fcf 100644
--- 
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/DirectedAcyclicGraphTraversalTest.java
+++ 
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/DirectedAcyclicGraphTraversalTest.java
@@ -58,7 +58,7 @@ public class DirectedAcyclicGraphTraversalTest {
 
   @Test
   public void traverseGraphOneLevelPerStepFromRoot() {
-    Traveller<String> traveller = new ByLevelTraveller<>(dag, new 
FromRootTraversalDirection<>());
+    Traveller<String> traveller = new ByLevelTraveller<>(dag, new 
FromRootTraversalDirection<String>());
 
     Collection<Node<String>> nodes = traveller.nodes();
 
@@ -80,7 +80,7 @@ public class DirectedAcyclicGraphTraversalTest {
 
   @Test
   public void traverseGraphOneLevelPerStepFromLeaf() {
-    Traveller<String> traveller = new ByLevelTraveller<>(dag, new 
FromLeafTraversalDirection<>());
+    Traveller<String> traveller = new ByLevelTraveller<>(dag, new 
FromLeafTraversalDirection<String>());
 
     Collection<Node<String>> nodes = traveller.nodes();
 
diff --git 
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphBuilderTest.java
 
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphBuilderTest.java
index 8703c7a..9a6c0f8 100644
--- 
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphBuilderTest.java
+++ 
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphBuilderTest.java
@@ -19,8 +19,6 @@ package org.apache.servicecomb.saga.core.dag;
 
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
 import static org.apache.servicecomb.saga.core.Operation.TYPE_REST;
-import static java.util.Collections.emptyMap;
-import static java.util.Collections.emptySet;
 import static java.util.Collections.singleton;
 import static org.hamcrest.CoreMatchers.startsWith;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
@@ -31,8 +29,12 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.stream.Collectors;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.servicecomb.saga.core.NoOpSagaRequest;
 import org.apache.servicecomb.saga.core.SagaException;
@@ -49,28 +51,30 @@ import org.apache.servicecomb.saga.core.TransactionImpl;
 @SuppressWarnings("unchecked")
 public class GraphBuilderTest {
 
+  public static final Map<String, Map<String, String>> EMPTY_MAP = 
Collections.<String, Map<String, String>>emptyMap();
+
   private final SagaRequest request1 = new SagaRequestImpl(
       "request-aaa",
       "aaa",
       TYPE_REST,
-      new TransactionImpl("/rest/as", "post", emptyMap()),
-      new CompensationImpl("/rest/as","delete", emptyMap())
+      new TransactionImpl("/rest/as", "post", EMPTY_MAP),
+      new CompensationImpl("/rest/as","delete", EMPTY_MAP)
   );
 
   private final SagaRequest request2 = new SagaRequestImpl(
       "request-bbb",
       "bbb",
       TYPE_REST,
-      new TransactionImpl("/rest/bs", "post", emptyMap()),
-      new CompensationImpl("/rest/bs","delete", emptyMap())
+      new TransactionImpl("/rest/bs", "post", EMPTY_MAP),
+      new CompensationImpl("/rest/bs","delete", EMPTY_MAP)
   );
 
   private final SagaRequest request3 = new SagaRequestImpl(
       "request-ccc",
       "ccc",
       TYPE_REST,
-      new TransactionImpl("/rest/cs", "post", emptyMap()),
-      new CompensationImpl("/rest/cs","delete", emptyMap()),
+      new TransactionImpl("/rest/cs", "post", EMPTY_MAP),
+      new CompensationImpl("/rest/cs","delete", EMPTY_MAP),
       null,
       new String[]{"request-aaa", "request-bbb"}
   );
@@ -80,8 +84,8 @@ public class GraphBuilderTest {
       "request-duplicate-id",
       "xxx",
       TYPE_REST,
-      new TransactionImpl("/rest/xs", "post", emptyMap()),
-      new CompensationImpl("/rest/xs","delete", emptyMap())
+      new TransactionImpl("/rest/xs", "post", EMPTY_MAP),
+      new CompensationImpl("/rest/xs","delete", EMPTY_MAP)
   );
   private final SagaRequest[] duplicateRequests = {duplicateRequest, 
duplicateRequest};
 
@@ -90,14 +94,14 @@ public class GraphBuilderTest {
 
   @Before
   public void setUp() throws Exception {
-    when(detector.cycleJoints(any())).thenReturn(emptySet());
+    
when(detector.cycleJoints((SingleLeafDirectedAcyclicGraph<SagaRequest>)any())).thenReturn((Set<Node<SagaRequest>>)
 Collections.EMPTY_SET);
   }
 
   @Test
   public void buildsGraphOfParallelRequests() {
     SingleLeafDirectedAcyclicGraph<SagaRequest> tasks = 
graphBuilder.build(requests);
 
-    Traveller<SagaRequest> traveller = new ByLevelTraveller<>(tasks, new 
FromRootTraversalDirection<>());
+    Traveller<SagaRequest> traveller = new ByLevelTraveller<>(tasks, new 
FromRootTraversalDirection<SagaRequest>());
     Collection<Node<SagaRequest>> nodes = traveller.nodes();
 
     traveller.next();
@@ -130,7 +134,7 @@ public class GraphBuilderTest {
   @Test
   public void blowsUpWhenGraphContainsCycle() {
     reset(detector);
-    when(detector.cycleJoints(any())).thenReturn(singleton(new Node<>(0L, 
null)));
+    when(detector.cycleJoints((SingleLeafDirectedAcyclicGraph<SagaRequest>) 
any())).thenReturn(singleton(new Node<SagaRequest>(0L, null)));
 
     try {
       graphBuilder.build(requests);
@@ -141,8 +145,10 @@ public class GraphBuilderTest {
   }
 
   private Collection<SagaRequest> requestsOf(Collection<Node<SagaRequest>> 
nodes) {
-    return nodes.stream()
-        .map(Node::value)
-        .collect(Collectors.toList());
+    List<SagaRequest> result = new ArrayList<>();
+    for(Node<SagaRequest> node: nodes) {
+      result.add(node.value());
+    }
+    return result;
   }
 }
diff --git 
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorTest.java
 
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorTest.java
index 934c54b..190beeb 100644
--- 
a/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorTest.java
+++ 
b/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorTest.java
@@ -52,7 +52,6 @@ public class GraphCycleDetectorTest {
     node1.addChild(node3);
 
     Set<Node<String>> nodes = detector.cycleJoints(graph);
-
     assertThat(nodes.isEmpty(), is(true));
   }
 
@@ -63,7 +62,7 @@ public class GraphCycleDetectorTest {
     node3.addChild(node1);
 
     Set<Node<String>> nodes = detector.cycleJoints(graph);
-
+   
     assertThat(nodes, contains(node1));
   }
 
diff --git 
a/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonFallback.java
 
b/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonFallback.java
index 443ba66..a70aafb 100644
--- 
a/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonFallback.java
+++ 
b/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonFallback.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.format;
 
 import org.apache.servicecomb.saga.core.Fallback;
 import org.apache.servicecomb.saga.core.Operation;
+import org.apache.servicecomb.saga.core.SagaResponse;
 import org.apache.servicecomb.saga.transports.TransportFactory;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -60,5 +61,15 @@ public interface JacksonFallback extends Fallback, 
TransportAware {
     public Operation with(TransportFactory transport) {
       return this;
     }
+
+    @Override
+    public SagaResponse send(String address) {
+      return SUCCESSFUL_SAGA_RESPONSE;
+    }
+
+    @Override
+    public SagaResponse send(String address, SagaResponse response) {
+      return send(address);
+    }
   }
 }

Reply via email to