This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit ebf8851ea7f430f80c60839ff277b4764abb223a Author: seanyinx <[email protected]> AuthorDate: Thu Jan 18 16:59:51 2018 +0800 SCB-218 excluded duplicate events during command persistence Signed-off-by: seanyinx <[email protected]> --- .../servicecomb/saga/alpha/core/Command.java | 32 ++++++++++++++++++---- .../saga/alpha/core/TxConsistentService.java | 12 +++++++- .../servicecomb/saga/alpha/core/TxEvent.java | 32 ++++++++++++++++++++-- .../servicecomb/saga/alpha/server/AlphaConfig.java | 11 +++++++- .../saga/alpha/server/SpringCommandRepository.java | 4 +-- .../alpha/server/TxEventEnvelopeRepository.java | 4 +-- alpha/alpha-server/src/test/resources/schema.sql | 2 +- 7 files changed, 83 insertions(+), 14 deletions(-) diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java index b29988b..b5902e1 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java @@ -48,7 +48,8 @@ public class Command { Command() { } - Command(String serviceName, + Command(long id, + String serviceName, String instanceId, String globalTxId, String localTxId, @@ -57,6 +58,7 @@ public class Command { byte[] payloads, String status) { + this.surrogateId = id; this.serviceName = serviceName; this.instanceId = instanceId; this.globalTxId = globalTxId; @@ -68,7 +70,8 @@ public class Command { this.lastModified = new Date(); } - Command(String serviceName, + Command(long id, + String serviceName, String instanceId, String globalTxId, String localTxId, @@ -76,11 +79,12 @@ public class Command { String compensationMethod, byte[] payloads) { - this(serviceName, instanceId, globalTxId, localTxId, parentTxId, compensationMethod, payloads, NEW.name()); + this(id, serviceName, instanceId, globalTxId, localTxId, parentTxId, compensationMethod, payloads, NEW.name()); } Command(Command command, CommandStatus status) { - this(command.serviceName, + this(command.surrogateId, + command.serviceName, command.instanceId, command.globalTxId, command.localTxId, @@ -91,7 +95,8 @@ public class Command { } public Command(TxEvent event) { - this(event.serviceName(), + this(event.id(), + event.serviceName(), event.instanceId(), event.globalTxId(), event.localTxId(), @@ -131,4 +136,21 @@ public class Command { String status() { return status; } + + long id() { + return surrogateId; + } + + @Override + public String toString() { + return "Command{" + + "surrogateId=" + surrogateId + + ", serviceName='" + serviceName + '\'' + + ", instanceId='" + instanceId + '\'' + + ", globalTxId='" + globalTxId + '\'' + + ", localTxId='" + localTxId + '\'' + + ", parentTxId='" + parentTxId + '\'' + + ", compensationMethod='" + compensationMethod + '\'' + + '}'; + } } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java index 8f88735..61beea5 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java @@ -52,15 +52,25 @@ public class TxConsistentService { }}; private final ExecutorService executor = Executors.newSingleThreadExecutor(); - private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private final ScheduledExecutorService scheduler; public TxConsistentService(TxEventRepository eventRepository, CommandRepository commandRepository, OmegaCallback omegaCallback, int commandPollingInterval) { + + this(eventRepository, commandRepository, omegaCallback, commandPollingInterval, Executors.newSingleThreadScheduledExecutor()); + } + + public TxConsistentService(TxEventRepository eventRepository, + CommandRepository commandRepository, + OmegaCallback omegaCallback, + int commandPollingInterval, + ScheduledExecutorService scheduler) { this.eventRepository = eventRepository; this.commandRepository = commandRepository; this.omegaCallback = omegaCallback; + this.scheduler = scheduler; scheduleCompensationCommandPolling(commandPollingInterval); } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java index 0ff2299..b654689 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java @@ -65,7 +65,34 @@ public class TxEvent { String type, String compensationMethod, byte[] payloads) { - this.surrogateId = -1L; + this(-1L, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type, compensationMethod, payloads); + } + + public TxEvent( + long id, + String serviceName, + String instanceId, + String globalTxId, + String localTxId, + String parentTxId, + String type, + String compensationMethod, + byte[] payloads) { + this(id, serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads); + } + + TxEvent(Long surrogateId, + String serviceName, + String instanceId, + Date creationTime, + String globalTxId, + String localTxId, + String parentTxId, + String type, + String compensationMethod, + byte[] payloads) { + + this.surrogateId = surrogateId; this.serviceName = serviceName; this.instanceId = instanceId; this.creationTime = creationTime; @@ -120,7 +147,8 @@ public class TxEvent { @Override public String toString() { return "TxEvent{" + - "serviceName='" + serviceName + '\'' + + "surrogateId=" + surrogateId + + ", serviceName='" + serviceName + '\'' + ", instanceId='" + instanceId + '\'' + ", creationTime=" + creationTime + ", globalTxId='" + globalTxId + '\'' + diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java index 9c34738..3740581 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java @@ -20,7 +20,9 @@ package org.apache.servicecomb.saga.alpha.server; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; import javax.annotation.PostConstruct; @@ -65,8 +67,14 @@ class AlphaConfig { } @Bean + ScheduledExecutorService compensationScheduler() { + return Executors.newSingleThreadScheduledExecutor(); + } + + @Bean TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port, @Value("${alpha.command.pollingInterval:3000}") int commandPollingInterval, + ScheduledExecutorService scheduler, TxEventRepository eventRepository, CommandRepository commandRepository, OmegaCallback omegaCallback, @@ -76,7 +84,8 @@ class AlphaConfig { eventRepository, commandRepository, omegaCallback, - commandPollingInterval); + commandPollingInterval, + scheduler); ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks); new Thread(startable::start).start(); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java index a335849..18ee9ad 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java @@ -21,7 +21,7 @@ import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE; import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW; import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -61,7 +61,7 @@ public class SpringCommandRepository implements CommandRepository { List<TxEvent> events = eventRepository .findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(globalTxId); - Map<String, Command> commands = new HashMap<>(); + Map<String, Command> commands = new LinkedHashMap<>(); for (TxEvent event : events) { commands.computeIfAbsent(event.localTxId(), k -> new Command(event)); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java index 71d5f1b..09aec6f 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java @@ -54,8 +54,8 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> { ) List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId); - @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent(" - + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, t.type, t.compensationMethod, t.payloads" + @Query("SELECT new org.apache.servicecomb.saga.alpha.core.TxEvent(" + + "t.surrogateId, t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, t.type, t.compensationMethod, t.payloads" + ") FROM TxEvent t " + "WHERE t.globalTxId = ?1 AND t.type = 'TxStartedEvent' AND EXISTS ( " + " SELECT t1.globalTxId" diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql index c6d66de..fb396b3 100644 --- a/alpha/alpha-server/src/test/resources/schema.sql +++ b/alpha/alpha-server/src/test/resources/schema.sql @@ -13,7 +13,7 @@ CREATE TABLE IF NOT EXISTS `TxEvent` ( ) DEFAULT CHARSET=utf8; CREATE TABLE IF NOT EXISTS `Command` ( - `surrogateId` bigint NOT NULL AUTO_INCREMENT, + `surrogateId` bigint NOT NULL, `serviceName` varchar(36) NOT NULL, `instanceId` varchar(36) NOT NULL, `globalTxId` varchar(36) NOT NULL, -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
