This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit f075bd5f27c76a073e253095a99a3b0b73153cfc Author: seanyinx <[email protected]> AuthorDate: Fri Jan 19 16:35:03 2018 +0800 SCB-218 locked commands to avoid duplicate compensation callback Signed-off-by: seanyinx <[email protected]> --- .../servicecomb/saga/alpha/core/Command.java | 2 +- .../servicecomb/saga/alpha/core/EventScanner.java | 28 +++++------ .../saga/alpha/core/TxEventRepository.java | 2 + .../saga/alpha/core/TxConsistentServiceTest.java | 4 ++ alpha/alpha-server/pom.xml | 4 +- .../servicecomb/saga/alpha/server/AlphaConfig.java | 2 - .../saga/alpha/server/CommandEntityRepository.java | 3 ++ .../saga/alpha/server/SpringCommandRepository.java | 17 ++++--- .../saga/alpha/server/SpringTxEventRepository.java | 5 ++ .../alpha/server/TxEventEnvelopeRepository.java | 17 ++++++- .../saga/alpha/server/AlphaIntegrationTest.java | 36 +++++++++++++- alpha/alpha-server/src/test/resources/schema.sql | 55 +++++++++++----------- 12 files changed, 119 insertions(+), 56 deletions(-) diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java index 2716abf..49c1756 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java @@ -47,7 +47,7 @@ public class Command { private Date lastModified; @Version - private int version; + private long version; Command() { } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java index 5a4589d..f9fa3be 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java @@ -38,7 +38,6 @@ public class EventScanner implements Runnable { private final TxEventRepository eventRepository; private final CommandRepository commandRepository; private final OmegaCallback omegaCallback; - private final int commandPollingInterval; private final int eventPollingInterval; private long nextEndedEventId; @@ -48,20 +47,17 @@ public class EventScanner implements Runnable { TxEventRepository eventRepository, CommandRepository commandRepository, OmegaCallback omegaCallback, - int commandPollingInterval, int eventPollingInterval) { this.scheduler = scheduler; this.eventRepository = eventRepository; this.commandRepository = commandRepository; this.omegaCallback = omegaCallback; - this.commandPollingInterval = commandPollingInterval; this.eventPollingInterval = eventPollingInterval; } @Override public void run() { - pollCompensationCommand(commandPollingInterval); pollEvents(); } @@ -69,7 +65,9 @@ public class EventScanner implements Runnable { scheduler.scheduleWithFixedDelay( () -> { saveUncompensatedEventsToCommands(); + compensate(); updateCompensatedCommands(); + deleteDuplicateSagaEndedEvents(); }, 0, eventPollingInterval, @@ -94,6 +92,14 @@ public class EventScanner implements Runnable { }); } + private void deleteDuplicateSagaEndedEvents() { + try { + eventRepository.deleteDuplicateEvents(SagaEndedEvent.name()); + } catch (Exception e) { + log.warn("Failed to delete duplicate SagaEndedEvent", e); + } + } + // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late, // unless we ask user to specify a name for each participant in the global TX in @Compensable private void updateCompensationStatus(TxEvent event) { @@ -102,9 +108,11 @@ public class EventScanner implements Runnable { event.globalTxId(), event.localTxId()); - if (eventRepository.findTransactions(event.globalTxId(), SagaEndedEvent.name()).isEmpty() - && commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) { + markSagaEnded(event); + } + private void markSagaEnded(TxEvent event) { + if (commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) { markGlobalTxEnd(event); } } @@ -127,14 +135,6 @@ public class EventScanner implements Runnable { EMPTY_PAYLOAD); } - private void pollCompensationCommand(int commandPollingInterval) { - scheduler.scheduleWithFixedDelay( - this::compensate, - 0, - commandPollingInterval, - MILLISECONDS); - } - private void compensate() { commandRepository.findFirstCommandToCompensate() .forEach(command -> { diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java index d793de2..b61aa06 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java @@ -28,4 +28,6 @@ public interface TxEventRepository { List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type); Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type); + + void deleteDuplicateEvents(String type); } diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java index 473501e..231d5bf 100644 --- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java +++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java @@ -64,6 +64,10 @@ public class TxConsistentServiceTest { public Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type) { return Optional.empty(); } + + @Override + public void deleteDuplicateEvents(String type) { + } }; private final String globalTxId = UUID.randomUUID().toString(); diff --git a/alpha/alpha-server/pom.xml b/alpha/alpha-server/pom.xml index ae894b8..a05177a 100644 --- a/alpha/alpha-server/pom.xml +++ b/alpha/alpha-server/pom.xml @@ -82,8 +82,8 @@ <scope>test</scope> </dependency> <dependency> - <groupId>com.h2database</groupId> - <artifactId>h2</artifactId> + <groupId>org.hsqldb</groupId> + <artifactId>hsqldb</artifactId> <scope>test</scope> </dependency> <dependency> diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java index 73298be..769ee5a 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java @@ -74,7 +74,6 @@ class AlphaConfig { @Bean TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port, - @Value("${alpha.command.pollingInterval:500}") int commandPollingInterval, @Value("${alpha.event.pollingInterval:500}") int eventPollingInterval, ScheduledExecutorService scheduler, TxEventRepository eventRepository, @@ -86,7 +85,6 @@ class AlphaConfig { eventRepository, commandRepository, omegaCallback, - commandPollingInterval, eventPollingInterval).run(); TxConsistentService consistentService = new TxConsistentService(eventRepository); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java index 4b8c3ee..17df477 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java @@ -19,10 +19,12 @@ package org.apache.servicecomb.saga.alpha.server; import java.util.List; +import javax.persistence.LockModeType; import javax.transaction.Transactional; import org.apache.servicecomb.saga.alpha.core.Command; import org.springframework.data.domain.Pageable; +import org.springframework.data.jpa.repository.Lock; import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.CrudRepository; @@ -44,6 +46,7 @@ public interface CommandEntityRepository extends CrudRepository<Command, Long> { List<Command> findByGlobalTxIdAndStatus(String globalTxId, String status); // TODO: 2018/1/18 we assumed compensation will never fail. if all service instances are not reachable, we have to set up retry mechanism for pending commands + @Lock(LockModeType.OPTIMISTIC) @Query("SELECT c FROM Command c " + "WHERE c.eventId IN (" + " SELECT MAX(c1.eventId) FROM Command c1 WHERE c1.status = 'NEW' GROUP BY c1.globalTxId" diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java index 1dabeda..8241d81 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java @@ -26,6 +26,8 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import javax.transaction.Transactional; + import org.apache.servicecomb.saga.alpha.core.Command; import org.apache.servicecomb.saga.alpha.core.CommandRepository; import org.apache.servicecomb.saga.alpha.core.TxEvent; @@ -56,13 +58,15 @@ public class SpringCommandRepository implements CommandRepository { commands.computeIfAbsent(event.localTxId(), k -> new Command(event)); } - log.info("Saving compensation commands {}", commands.values()); - try { - commandRepository.save(commands.values()); - } catch (Exception e) { - log.warn("Failed to save some commands", e); + for (Command command : commands.values()) { + log.info("Saving compensation command {}", command); + try { + commandRepository.save(command); + } catch (Exception e) { + log.warn("Failed to save some command {}", command); + } + log.info("Saved compensation command {}", command); } - log.info("Saved compensation commands {}", commands.values()); } @Override @@ -75,6 +79,7 @@ public class SpringCommandRepository implements CommandRepository { return commandRepository.findByGlobalTxIdAndStatus(globalTxId, NEW.name()); } + @Transactional @Override public List<Command> findFirstCommandToCompensate() { List<Command> commands = commandRepository diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java index 4108aa5..ad32148 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java @@ -50,4 +50,9 @@ class SpringTxEventRepository implements TxEventRepository { public Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type) { return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id); } + + @Override + public void deleteDuplicateEvents(String type) { + eventRepo.deleteByType(type); + } } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java index e974527..2e52fef 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java @@ -20,8 +20,11 @@ package org.apache.servicecomb.saga.alpha.server; import java.util.List; import java.util.Optional; +import javax.transaction.Transactional; + import org.apache.servicecomb.saga.alpha.core.TxEvent; import org.springframework.data.domain.Pageable; +import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.CrudRepository; @@ -51,7 +54,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> { List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId); @Query("SELECT t FROM TxEvent t " - + "WHERE t.type = 'TxEndedEvent' AND t.surrogateId > ?2 AND EXISTS ( " + + "WHERE t.type = ?1 AND t.surrogateId > ?2 AND EXISTS ( " + " SELECT t1.globalTxId" + " FROM TxEvent t1 " + " WHERE t1.globalTxId = t.globalTxId " @@ -59,11 +62,21 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> { + ") AND NOT EXISTS ( " + " SELECT t2.globalTxId" + " FROM TxEvent t2 " - + " WHERE t2.globalTxId = ?1 " + + " WHERE t2.globalTxId = t.globalTxId " + " AND t2.localTxId = t.localTxId " + " AND t2.type = 'TxCompensatedEvent') " + "ORDER BY t.surrogateId ASC") List<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId, Pageable pageable); Optional<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId); + + @Transactional + @Modifying(clearAutomatically = true) + @Query("DELETE FROM TxEvent t " + + "WHERE t.type = ?1 AND t.surrogateId NOT IN (" + + " SELECT MAX(t1.surrogateId) FROM TxEvent t1 " + + " WHERE t1.type = ?1" + + " GROUP BY t1.globalTxId" + + ")") + void deleteByType(String type); } diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java index a5356bb..a928443 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java @@ -36,9 +36,13 @@ import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Consumer; +import javax.annotation.PostConstruct; + +import org.apache.servicecomb.saga.alpha.core.CommandRepository; import org.apache.servicecomb.saga.alpha.core.OmegaCallback; import org.apache.servicecomb.saga.alpha.core.TxConsistentService; import org.apache.servicecomb.saga.alpha.core.TxEvent; +import org.apache.servicecomb.saga.alpha.core.TxEventRepository; import org.apache.servicecomb.saga.common.EventType; import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck; import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand; @@ -65,7 +69,7 @@ import io.grpc.stub.StreamObserver; @RunWith(SpringRunner.class) @SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class}, - properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=1", "alpha.event.pollingInterval=1"}) + properties = {"alpha.server.port=8090", "alpha.event.pollingInterval=1"}) public class AlphaIntegrationTest { private static final int port = 8090; @@ -93,6 +97,18 @@ public class AlphaIntegrationTest { private TxEventEnvelopeRepository eventRepo; @Autowired + private TxEventRepository eventRepository; + + @Autowired + private CommandRepository commandRepository; + + @Autowired + private CommandEntityRepository commandEntityRepository; + + @Autowired + private OmegaCallback omegaCallback; + + @Autowired private Map<String, Map<String, OmegaCallback>> omegaCallbacks; @Autowired @@ -115,6 +131,19 @@ public class AlphaIntegrationTest { @After public void after() throws Exception { blockingStub.onDisconnected(serviceConfig); + deleteAllTillSuccessful(); + } + + public void deleteAllTillSuccessful() { + boolean deleted = false; + do { + try { + eventRepo.deleteAll(); + commandEntityRepository.deleteAll(); + deleted = true; + } catch (Exception ignored) { + } + } while (!deleted); } @Test @@ -413,4 +442,9 @@ public class AlphaIntegrationTest { return completed; } } + + @PostConstruct + void init() { +// new EventScanner(Executors.newScheduledThreadPool(2), eventRepository, commandRepository, omegaCallback, 1, 1).run(); + } } diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql index 71444ec..344fdda 100644 --- a/alpha/alpha-server/src/test/resources/schema.sql +++ b/alpha/alpha-server/src/test/resources/schema.sql @@ -1,29 +1,28 @@ -CREATE TABLE IF NOT EXISTS `TxEvent` ( - `surrogateId` bigint NOT NULL AUTO_INCREMENT, - `serviceName` varchar(36) NOT NULL, - `instanceId` varchar(36) NOT NULL, - `creationTime` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), - `globalTxId` varchar(36) NOT NULL, - `localTxId` varchar(36) NOT NULL, - `parentTxId` varchar(36) DEFAULT NULL, - `type` varchar(50) NOT NULL, - `compensationMethod` varchar(256) NOT NULL, - `payloads` varbinary(10240), - PRIMARY KEY (`surrogateId`) -) DEFAULT CHARSET=utf8; +CREATE TABLE IF NOT EXISTS TxEvent ( + surrogateId bigint GENERATED BY DEFAULT AS IDENTITY(START WITH 1, INCREMENT BY 1) PRIMARY KEY, + serviceName varchar(36) NOT NULL, + instanceId varchar(36) NOT NULL, + creationTime TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL, + globalTxId varchar(36) NOT NULL, + localTxId varchar(36) NOT NULL, + parentTxId varchar(36) DEFAULT NULL, + type varchar(50) NOT NULL, + compensationMethod varchar(256) NOT NULL, + payloads varbinary(10240), +-- version bigint NOT NULL +); -CREATE TABLE IF NOT EXISTS `Command` ( - `surrogateId` bigint NOT NULL AUTO_INCREMENT, - `eventId` bigint NOT NULL UNIQUE, - `serviceName` varchar(36) NOT NULL, - `instanceId` varchar(36) NOT NULL, - `globalTxId` varchar(36) NOT NULL, - `localTxId` varchar(36) NOT NULL, - `parentTxId` varchar(36) DEFAULT NULL, - `compensationMethod` varchar(256) NOT NULL, - `payloads` varbinary(10240), - `status` varchar(12), - `lastModified` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), - `version` bigint NOT NULL, - PRIMARY KEY (`surrogateId`) -) DEFAULT CHARSET=utf8; +CREATE TABLE IF NOT EXISTS Command ( + surrogateId bigint GENERATED BY DEFAULT AS IDENTITY(START WITH 1, INCREMENT BY 1) PRIMARY KEY, + eventId bigint NOT NULL UNIQUE, + serviceName varchar(36) NOT NULL, + instanceId varchar(36) NOT NULL, + globalTxId varchar(36) NOT NULL, + localTxId varchar(36) NOT NULL, + parentTxId varchar(36) DEFAULT NULL, + compensationMethod varchar(256) NOT NULL, + payloads varbinary(10240), + status varchar(12), + lastModified TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL, + version bigint NOT NULL +); -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
