This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 9a401719de2db3e81e6fa04b2d36ffa6305c0460 Author: seanyinx <[email protected]> AuthorDate: Thu Jan 18 15:07:29 2018 +0800 SCB-218 updated schemas accordingly due to change of ORM tech Signed-off-by: seanyinx <[email protected]> --- .../servicecomb/saga/alpha/core/Command.java | 17 +++++++ .../servicecomb/saga/alpha/core/TxEvent.java | 5 ++ .../saga/alpha/server/CommandEntity.java | 55 ---------------------- .../saga/alpha/server/CommandEntityRepository.java | 25 +++++----- .../saga/alpha/server/SpringCommandRepository.java | 26 ++++------ .../saga/alpha/server/SpringTxEventRepository.java | 2 +- .../alpha/server/TxEventEnvelopeRepository.java | 33 +++++++------ alpha/alpha-server/src/test/resources/schema.sql | 15 ++++++ 8 files changed, 80 insertions(+), 98 deletions(-) diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java index c852c16..b29988b 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java @@ -19,7 +19,18 @@ package org.apache.servicecomb.saga.alpha.core; import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW; +import java.util.Date; + +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.Version; + +@Entity public class Command { + + @Id + private Long surrogateId; + private String serviceName; private String instanceId; private String globalTxId; @@ -29,6 +40,11 @@ public class Command { private byte[] payloads; private String status; + private Date lastModified; + + @Version + private int version; + Command() { } @@ -49,6 +65,7 @@ public class Command { this.compensationMethod = compensationMethod; this.payloads = payloads; this.status = status; + this.lastModified = new Date(); } Command(String serviceName, diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java index 760dd70..0ff2299 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java @@ -65,6 +65,7 @@ public class TxEvent { String type, String compensationMethod, byte[] payloads) { + this.surrogateId = -1L; this.serviceName = serviceName; this.instanceId = instanceId; this.creationTime = creationTime; @@ -112,6 +113,10 @@ public class TxEvent { return payloads; } + public long id() { + return surrogateId; + } + @Override public String toString() { return "TxEvent{" + diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java deleted file mode 100644 index 6207002..0000000 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.servicecomb.saga.alpha.server; - -import java.util.Date; - -import javax.persistence.Embedded; -import javax.persistence.Entity; -import javax.persistence.Id; -import javax.persistence.Version; - -import org.apache.servicecomb.saga.alpha.core.Command; -import org.apache.servicecomb.saga.alpha.core.TxEvent; - -@Entity -class CommandEntity { - @Id - private long surrogateId; - - @Embedded - private Command command; - - private Date lastModified; - - @Version - private int version; - - CommandEntity() { - } - - CommandEntity(long id, TxEvent event) { - surrogateId = id; - lastModified = new Date(); - command = new Command(event); - } - - Command command() { - return command; - } -} diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java index 9402486..fffea56 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java @@ -22,32 +22,33 @@ import java.util.Optional; import javax.transaction.Transactional; +import org.apache.servicecomb.saga.alpha.core.Command; import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.CrudRepository; import org.springframework.data.repository.query.Param; -public interface CommandEntityRepository extends CrudRepository<CommandEntity, Long> { - Optional<CommandEntity> findByCommandGlobalTxIdAndCommandLocalTxId(String globalTxId, String localTxId); +public interface CommandEntityRepository extends CrudRepository<Command, Long> { + Optional<Command> findByGlobalTxIdAndLocalTxId(String globalTxId, String localTxId); @Transactional @Modifying - @Query("UPDATE org.apache.servicecomb.saga.alpha.server.CommandEntity c " - + "SET c.command.status = :status " - + "WHERE c.command.globalTxId = :globalTxId " - + "AND c.command.localTxId = :localTxId") + @Query("UPDATE org.apache.servicecomb.saga.alpha.core.Command c " + + "SET c.status = :status " + + "WHERE c.globalTxId = :globalTxId " + + "AND c.localTxId = :localTxId") void updateStatusByGlobalTxIdAndLocalTxId( @Param("status") String status, @Param("globalTxId") String globalTxId, @Param("localTxId") String localTxId); - List<CommandEntity> findByCommandGlobalTxIdAndCommandStatus(String globalTxId, String status); + List<Command> findByGlobalTxIdAndStatus(String globalTxId, String status); - @Query("FROM CommandEntity c " - + "WHERE id IN (" - + " SELECT MAX(id) FROM CommandEntity c1 WHERE c1.command.status <> 'DONE' GROUP BY c1.command.globalTxId" + @Query("SELECT c FROM Command c " + + "WHERE c.surrogateId IN (" + + " SELECT MAX(c1.surrogateId) FROM Command c1 WHERE c1.status <> 'DONE' GROUP BY c1.globalTxId" + ") " - + "ORDER BY c.id ASC") - List<CommandEntity> findFirstGroupByCommandGlobalTxIdOrderByIdDesc(Pageable pageable); + + "ORDER BY c.surrogateId ASC") + List<Command> findFirstGroupByGlobalTxIdOrderByIdDesc(Pageable pageable); } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java index aac3c22..a335849 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java @@ -24,10 +24,10 @@ import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import org.apache.servicecomb.saga.alpha.core.Command; import org.apache.servicecomb.saga.alpha.core.CommandRepository; +import org.apache.servicecomb.saga.alpha.core.TxEvent; import org.springframework.data.domain.PageRequest; public class SpringCommandRepository implements CommandRepository { @@ -43,28 +43,28 @@ public class SpringCommandRepository implements CommandRepository { @Override public boolean exists(String globalTxId, String localTxId) { - return commandRepository.findByCommandGlobalTxIdAndCommandLocalTxId(globalTxId, localTxId).isPresent(); + return commandRepository.findByGlobalTxIdAndLocalTxId(globalTxId, localTxId).isPresent(); } @Override public void saveCompensationCommand(String globalTxId, String localTxId) { - TxEventEnvelope startedEvent = eventRepository.findFirstByEventGlobalTxIdAndEventLocalTxIdAndEventTypeOrderByIdAsc( + TxEvent startedEvent = eventRepository.findFirstByGlobalTxIdAndLocalTxIdAndTypeOrderBySurrogateIdAsc( globalTxId, localTxId, TxStartedEvent.name()); - commandRepository.save(new CommandEntity(startedEvent.id(), startedEvent.event())); + commandRepository.save(new Command(startedEvent)); } @Override public void saveCompensationCommands(String globalTxId) { - List<TxEventEnvelope> events = eventRepository + List<TxEvent> events = eventRepository .findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(globalTxId); - Map<String, CommandEntity> commands = new HashMap<>(); + Map<String, Command> commands = new HashMap<>(); - for (TxEventEnvelope event : events) { - commands.computeIfAbsent(event.localTxId(), k -> new CommandEntity(event.id(), event.event())); + for (TxEvent event : events) { + commands.computeIfAbsent(event.localTxId(), k -> new Command(event)); } commandRepository.save(commands.values()); @@ -77,18 +77,12 @@ public class SpringCommandRepository implements CommandRepository { @Override public List<Command> findUncompletedCommands(String globalTxId) { - return commandRepository.findByCommandGlobalTxIdAndCommandStatus(globalTxId, NEW.name()) - .stream() - .map(CommandEntity::command) - .collect(Collectors.toList()); + return commandRepository.findByGlobalTxIdAndStatus(globalTxId, NEW.name()); } @Override public List<Command> findFirstCommandToCompensate() { return commandRepository - .findFirstGroupByCommandGlobalTxIdOrderByIdDesc(SINGLE_COMMAND_REQUEST) - .stream() - .map(CommandEntity::command) - .collect(Collectors.toList()); + .findFirstGroupByGlobalTxIdOrderByIdDesc(SINGLE_COMMAND_REQUEST); } } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java index f0743e9..7c44639 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java @@ -41,7 +41,7 @@ class SpringTxEventRepository implements TxEventRepository { @Override public TxEvent findFirstTransaction(String globalTxId, String localTxId, String type) { - return eventRepo.findFirstByGlobalTxIdAndLocalTxIdAndType(globalTxId, localTxId, type); + return eventRepo.findFirstByGlobalTxIdAndLocalTxIdAndTypeOrderBySurrogateIdAsc(globalTxId, localTxId, type); } @Override diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java index 85ed954..71d5f1b 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java @@ -32,7 +32,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> { + "WHERE t.globalTxId = ?1 AND t.type = ?2") List<TxEvent> findByEventGlobalTxIdAndEventType(String globalTxId, String type); - TxEvent findFirstByGlobalTxIdAndLocalTxIdAndType(String globalTxId, String localTxId, String type); + TxEvent findFirstByGlobalTxIdAndLocalTxIdAndTypeOrderBySurrogateIdAsc(String globalTxId, String localTxId, String type); @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent(" + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, t.type, t.compensationMethod, t.payloads" @@ -49,21 +49,26 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> { + " FROM TxEvent t2 " + " WHERE t2.globalTxId = ?1 " + " AND t2.localTxId = t.localTxId " - + " AND t2.type = 'TxCompensatedEvent')" + + " AND t2.type = 'TxCompensatedEvent') " + + "ORDER BY t.surrogateId ASC" ) List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId); - @Query("FROM TxEventEnvelope t " - + "WHERE t.event.globalTxId = ?1 AND t.event.type = 'TxStartedEvent' AND EXISTS ( " - + " FROM TxEventEnvelope t1 " - + " WHERE t1.event.globalTxId = ?1 " - + " AND t1.event.localTxId = t.event.localTxId " - + " AND t1.event.type = 'TxEndedEvent'" + @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent(" + + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, t.type, t.compensationMethod, t.payloads" + + ") FROM TxEvent t " + + "WHERE t.globalTxId = ?1 AND t.type = 'TxStartedEvent' AND EXISTS ( " + + " SELECT t1.globalTxId" + + " FROM TxEvent t1 " + + " WHERE t1.globalTxId = ?1 " + + " AND t1.localTxId = t.localTxId " + + " AND t1.type = 'TxEndedEvent'" + ") AND NOT EXISTS ( " - + " FROM TxEventEnvelope t2 " - + " WHERE t2.event.globalTxId = ?1 " - + " AND t2.event.localTxId = t.event.localTxId " - + " AND t2.event.type = 'TxCompensatedEvent')" - + "ORDER BY t.id ASC ") - List<TxEventEnvelope> findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(String globalTxId); + + " SELECT t2.globalTxId" + + " FROM TxEvent t2 " + + " WHERE t2.globalTxId = ?1 " + + " AND t2.localTxId = t.localTxId " + + " AND t2.type = 'TxCompensatedEvent') " + + "ORDER BY t.surrogateId ASC") + List<TxEvent> findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(String globalTxId); } diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql index 08945a7..c6d66de 100644 --- a/alpha/alpha-server/src/test/resources/schema.sql +++ b/alpha/alpha-server/src/test/resources/schema.sql @@ -11,3 +11,18 @@ CREATE TABLE IF NOT EXISTS `TxEvent` ( `payloads` varbinary(10240), PRIMARY KEY (`surrogateId`) ) DEFAULT CHARSET=utf8; + +CREATE TABLE IF NOT EXISTS `Command` ( + `surrogateId` bigint NOT NULL AUTO_INCREMENT, + `serviceName` varchar(36) NOT NULL, + `instanceId` varchar(36) NOT NULL, + `globalTxId` varchar(36) NOT NULL, + `localTxId` varchar(36) NOT NULL, + `parentTxId` varchar(36) DEFAULT NULL, + `compensationMethod` varchar(256) NOT NULL, + `payloads` varbinary(10240), + `status` varchar(12), + `lastModified` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), + `version` bigint NOT NULL, + PRIMARY KEY (`surrogateId`) +) DEFAULT CHARSET=utf8; -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
