This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 7aafab876c21da6f50ea10aad5095d3b18bcdf66 Author: seanyinx <[email protected]> AuthorDate: Thu Jan 18 17:55:43 2018 +0800 SCB-218 updated command status to pending when compensating Signed-off-by: seanyinx <[email protected]> --- .../java/org/apache/servicecomb/saga/alpha/core/Command.java | 4 ++-- .../apache/servicecomb/saga/alpha/core/CommandStatus.java | 1 + .../saga/alpha/server/CommandEntityRepository.java | 3 ++- .../saga/alpha/server/SpringCommandRepository.java | 11 ++++++++++- .../servicecomb/saga/alpha/server/AlphaIntegrationTest.java | 12 +++++++++++- 5 files changed, 26 insertions(+), 5 deletions(-) diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java index b5902e1..904cc54 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java @@ -113,11 +113,11 @@ public class Command { return instanceId; } - String globalTxId() { + public String globalTxId() { return globalTxId; } - String localTxId() { + public String localTxId() { return localTxId; } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java index cdf1f6c..0c9b78b 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java @@ -19,5 +19,6 @@ package org.apache.servicecomb.saga.alpha.core; public enum CommandStatus { NEW, + PENDING, DONE } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java index fffea56..d7c583e 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java @@ -45,9 +45,10 @@ public interface CommandEntityRepository extends CrudRepository<Command, Long> { List<Command> findByGlobalTxIdAndStatus(String globalTxId, String status); + // TODO: 2018/1/18 we assumed compensation will never fail. if all service instances are not reachable, we have to set up retry mechanism for pending commands @Query("SELECT c FROM Command c " + "WHERE c.surrogateId IN (" - + " SELECT MAX(c1.surrogateId) FROM Command c1 WHERE c1.status <> 'DONE' GROUP BY c1.globalTxId" + + " SELECT MAX(c1.surrogateId) FROM Command c1 WHERE c1.status = 'NEW' GROUP BY c1.globalTxId" + ") " + "ORDER BY c.surrogateId ASC") List<Command> findFirstGroupByGlobalTxIdOrderByIdDesc(Pageable pageable); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java index 18ee9ad..34b43a4 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java @@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.alpha.server; import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE; import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW; +import static org.apache.servicecomb.saga.alpha.core.CommandStatus.PENDING; import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; import java.util.LinkedHashMap; @@ -82,7 +83,15 @@ public class SpringCommandRepository implements CommandRepository { @Override public List<Command> findFirstCommandToCompensate() { - return commandRepository + List<Command> commands = commandRepository .findFirstGroupByGlobalTxIdOrderByIdDesc(SINGLE_COMMAND_REQUEST); + + commands.forEach(command -> + commandRepository.updateStatusByGlobalTxIdAndLocalTxId( + PENDING.name(), + command.globalTxId(), + command.localTxId())); + + return commands; } } diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java index 1514869..1bfbe3c 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java @@ -34,6 +34,8 @@ import java.util.Map; import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.Consumer; import org.apache.servicecomb.saga.alpha.core.OmegaCallback; @@ -55,6 +57,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; import org.springframework.test.context.junit4.SpringRunner; import com.google.protobuf.ByteString; @@ -65,7 +69,7 @@ import io.grpc.stub.StreamObserver; @RunWith(SpringRunner.class) @SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class}, - properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=300"}) + properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=1"}) public class AlphaIntegrationTest { private static final int port = 8090; @@ -413,4 +417,10 @@ public class AlphaIntegrationTest { return completed; } } + + @Primary + @Bean + ScheduledExecutorService scheduler() { + return Executors.newScheduledThreadPool(2); + } } -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
