This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit da2ef4619612ad08fea78db8da58eedb11f7b0aa Author: seanyinx <[email protected]> AuthorDate: Fri Jan 19 11:06:16 2018 +0800 SCB-218 added event id to check for command uniqueness Signed-off-by: seanyinx <[email protected]> --- .../servicecomb/saga/alpha/core/Command.java | 24 ++++++++-------------- .../saga/alpha/core/CommandRepository.java | 2 +- .../servicecomb/saga/alpha/core/EventScanner.java | 23 ++++++++++++--------- .../saga/alpha/server/CommandEntityRepository.java | 10 ++++----- .../saga/alpha/server/SpringCommandRepository.java | 11 +++++++--- .../alpha/server/TxEventEnvelopeRepository.java | 2 +- .../src/main/resources/schema-postgresql.sql | 20 +++++++++++++++++- .../saga/alpha/server/AlphaIntegrationTest.java | 2 +- alpha/alpha-server/src/test/resources/schema.sql | 3 ++- 9 files changed, 57 insertions(+), 40 deletions(-) diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java index 904cc54..2716abf 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java @@ -22,6 +22,8 @@ import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW; import java.util.Date; import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; import javax.persistence.Id; import javax.persistence.Version; @@ -29,8 +31,10 @@ import javax.persistence.Version; public class Command { @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) private Long surrogateId; + private long eventId; private String serviceName; private String instanceId; private String globalTxId; @@ -48,7 +52,7 @@ public class Command { Command() { } - Command(long id, + private Command(long id, String serviceName, String instanceId, String globalTxId, @@ -58,7 +62,7 @@ public class Command { byte[] payloads, String status) { - this.surrogateId = id; + this.eventId = id; this.serviceName = serviceName; this.instanceId = instanceId; this.globalTxId = globalTxId; @@ -70,7 +74,7 @@ public class Command { this.lastModified = new Date(); } - Command(long id, + private Command(long id, String serviceName, String instanceId, String globalTxId, @@ -82,18 +86,6 @@ public class Command { this(id, serviceName, instanceId, globalTxId, localTxId, parentTxId, compensationMethod, payloads, NEW.name()); } - Command(Command command, CommandStatus status) { - this(command.surrogateId, - command.serviceName, - command.instanceId, - command.globalTxId, - command.localTxId, - command.parentTxId, - command.compensationMethod, - command.payloads, - status.name()); - } - public Command(TxEvent event) { this(event.id(), event.serviceName(), @@ -144,7 +136,7 @@ public class Command { @Override public String toString() { return "Command{" + - "surrogateId=" + surrogateId + + "eventId=" + eventId + ", serviceName='" + serviceName + '\'' + ", instanceId='" + instanceId + '\'' + ", globalTxId='" + globalTxId + '\'' + diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java index 1da033f..2bbea77 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java @@ -21,7 +21,7 @@ import java.util.List; public interface CommandRepository { - Iterable<Command> saveCompensationCommands(String globalTxId); + void saveCompensationCommands(String globalTxId); void markCommandAsDone(String globalTxId, String localTxId); diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java index ea4aaf6..5a4589d 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java @@ -81,8 +81,7 @@ public class EventScanner implements Runnable { .forEach(event -> { log.info("Found uncompensated event {}", event); nextEndedEventId = event.id(); - commandRepository.saveCompensationCommands(event.globalTxId()) - .forEach(command -> nextEndedEventId = command.id()); + commandRepository.saveCompensationCommands(event.globalTxId()); }); } @@ -130,19 +129,23 @@ public class EventScanner implements Runnable { private void pollCompensationCommand(int commandPollingInterval) { scheduler.scheduleWithFixedDelay( - () -> commandRepository.findFirstCommandToCompensate() - .forEach(command -> { - log.info("Compensating transaction with globalTxId {} and localTxId {}", - command.globalTxId(), - command.localTxId()); - - omegaCallback.compensate(txStartedEventOf(command)); - }), + this::compensate, 0, commandPollingInterval, MILLISECONDS); } + private void compensate() { + commandRepository.findFirstCommandToCompensate() + .forEach(command -> { + log.info("Compensating transaction with globalTxId {} and localTxId {}", + command.globalTxId(), + command.localTxId()); + + omegaCallback.compensate(txStartedEventOf(command)); + }); + } + private TxEvent txStartedEventOf(Command command) { return new TxEvent( command.serviceName(), diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java index d7c583e..4b8c3ee 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java @@ -18,7 +18,6 @@ package org.apache.servicecomb.saga.alpha.server; import java.util.List; -import java.util.Optional; import javax.transaction.Transactional; @@ -30,10 +29,9 @@ import org.springframework.data.repository.CrudRepository; import org.springframework.data.repository.query.Param; public interface CommandEntityRepository extends CrudRepository<Command, Long> { - Optional<Command> findByGlobalTxIdAndLocalTxId(String globalTxId, String localTxId); @Transactional - @Modifying + @Modifying(clearAutomatically = true) @Query("UPDATE org.apache.servicecomb.saga.alpha.core.Command c " + "SET c.status = :status " + "WHERE c.globalTxId = :globalTxId " @@ -47,9 +45,9 @@ public interface CommandEntityRepository extends CrudRepository<Command, Long> { // TODO: 2018/1/18 we assumed compensation will never fail. if all service instances are not reachable, we have to set up retry mechanism for pending commands @Query("SELECT c FROM Command c " - + "WHERE c.surrogateId IN (" - + " SELECT MAX(c1.surrogateId) FROM Command c1 WHERE c1.status = 'NEW' GROUP BY c1.globalTxId" + + "WHERE c.eventId IN (" + + " SELECT MAX(c1.eventId) FROM Command c1 WHERE c1.status = 'NEW' GROUP BY c1.globalTxId" + ") " - + "ORDER BY c.surrogateId ASC") + + "ORDER BY c.eventId ASC") List<Command> findFirstGroupByGlobalTxIdOrderByIdDesc(Pageable pageable); } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java index 6076e54..1dabeda 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java @@ -46,9 +46,9 @@ public class SpringCommandRepository implements CommandRepository { } @Override - public Iterable<Command> saveCompensationCommands(String globalTxId) { + public void saveCompensationCommands(String globalTxId) { List<TxEvent> events = eventRepository - .findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(globalTxId); + .findStartedEventsWithMatchingEndedButNotCompensatedEvents(globalTxId); Map<String, Command> commands = new LinkedHashMap<>(); @@ -57,7 +57,12 @@ public class SpringCommandRepository implements CommandRepository { } log.info("Saving compensation commands {}", commands.values()); - return commandRepository.save(commands.values()); + try { + commandRepository.save(commands.values()); + } catch (Exception e) { + log.warn("Failed to save some commands", e); + } + log.info("Saved compensation commands {}", commands.values()); } @Override diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java index 78c2a1d..e974527 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java @@ -48,7 +48,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> { + " AND t2.localTxId = t.localTxId " + " AND t2.type = 'TxCompensatedEvent') " + "ORDER BY t.surrogateId ASC") - List<TxEvent> findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(String globalTxId); + List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId); @Query("SELECT t FROM TxEvent t " + "WHERE t.type = 'TxEndedEvent' AND t.surrogateId > ?2 AND EXISTS ( " diff --git a/alpha/alpha-server/src/main/resources/schema-postgresql.sql b/alpha/alpha-server/src/main/resources/schema-postgresql.sql index e84a9c3..d1c36c2 100644 --- a/alpha/alpha-server/src/main/resources/schema-postgresql.sql +++ b/alpha/alpha-server/src/main/resources/schema-postgresql.sql @@ -11,4 +11,22 @@ CREATE TABLE IF NOT EXISTS TxEvent ( payloads bytea ); -CREATE INDEX IF NOT EXISTS running_sagas_index ON TxEvent (globalTxId, localTxId, type); +CREATE INDEX IF NOT EXISTS saga_events_index ON TxEvent (surrogateId, globalTxId, localTxId, type); + + +CREATE TABLE IF NOT EXISTS Command ( + surrogateId BIGSERIAL PRIMARY KEY, + eventId bigint NOT NULL UNIQUE, + serviceName varchar(16) NOT NULL, + instanceId varchar(36) NOT NULL, + globalTxId varchar(36) NOT NULL, + localTxId varchar(36) NOT NULL, + parentTxId varchar(36) DEFAULT NULL, + compensationMethod varchar(256) NOT NULL, + payloads bytea, + status varchar(12), + lastModified timestamp(6) NOT NULL DEFAULT CURRENT_DATE, + version bigint NOT NULL, +); + +CREATE INDEX IF NOT EXISTS saga_commands_index ON Command (surrogateId, eventId, globalTxId, localTxId, type, status); diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java index 5cff57c..a5356bb 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java @@ -65,7 +65,7 @@ import io.grpc.stub.StreamObserver; @RunWith(SpringRunner.class) @SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class}, - properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=1", "alpha.event.pollingInterval=100"}) + properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=1", "alpha.event.pollingInterval=1"}) public class AlphaIntegrationTest { private static final int port = 8090; diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql index fb396b3..71444ec 100644 --- a/alpha/alpha-server/src/test/resources/schema.sql +++ b/alpha/alpha-server/src/test/resources/schema.sql @@ -13,7 +13,8 @@ CREATE TABLE IF NOT EXISTS `TxEvent` ( ) DEFAULT CHARSET=utf8; CREATE TABLE IF NOT EXISTS `Command` ( - `surrogateId` bigint NOT NULL, + `surrogateId` bigint NOT NULL AUTO_INCREMENT, + `eventId` bigint NOT NULL UNIQUE, `serviceName` varchar(36) NOT NULL, `instanceId` varchar(36) NOT NULL, `globalTxId` varchar(36) NOT NULL, -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
