This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 4b896392556f6b3b9dd7f21bf08193fc82eb5640 Author: seanyinx <[email protected]> AuthorDate: Tue Jan 16 11:54:52 2018 +0800 SCB-218 replaced in memory compensation store with persistent repo to make alpha stateless Signed-off-by: seanyinx <[email protected]> --- .../saga/alpha/core/{TxEvent.java => Command.java} | 93 +++++++++------------- .../saga/alpha/core/CommandRepository.java | 32 ++++++++ .../servicecomb/saga/alpha/core/CommandStatus.java | 23 ++++++ .../saga/alpha/core/TxConsistentService.java | 28 +++---- .../servicecomb/saga/alpha/core/TxEvent.java | 14 ++++ .../saga/alpha/core/TxConsistentServiceTest.java | 56 ++++++++++++- .../servicecomb/saga/alpha/server/AlphaConfig.java | 9 ++- .../saga/alpha/server/CommandEntity.java | 53 ++++++++++++ .../saga/alpha/server/CommandEntityRepository.java | 43 ++++++++++ .../saga/alpha/server/SpringCommandRepository.java | 78 ++++++++++++++++++ .../alpha/server/TxEventEnvelopeRepository.java | 13 +++ .../saga/omega/transaction/TransactionAspect.java | 1 + 12 files changed, 368 insertions(+), 75 deletions(-) diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java similarity index 58% copy from alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java copy to alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java index 37a29f1..08f8527 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java @@ -17,98 +17,81 @@ package org.apache.servicecomb.saga.alpha.core; -import java.util.Date; - -import javax.persistence.Entity; -import javax.persistence.GeneratedValue; -import javax.persistence.GenerationType; -import javax.persistence.Id; - -@Entity -public class TxEvent { - @Id - @GeneratedValue(strategy = GenerationType.IDENTITY) - private Long surrogateId; +import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW; +public class Command { private String serviceName; private String instanceId; - private Date creationTime; private String globalTxId; private String localTxId; private String parentTxId; - private String type; private String compensationMethod; private byte[] payloads; + private String status; - private TxEvent() { + Command() { } - public TxEvent( - String serviceName, + Command(String serviceName, String instanceId, String globalTxId, String localTxId, String parentTxId, - String type, String compensationMethod, - byte[] payloads) { - this(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads); - } + byte[] payloads, + String status) { - public TxEvent( - String serviceName, - String instanceId, - Date creationTime, - String globalTxId, - String localTxId, - String parentTxId, - String type, - String compensationMethod, - byte[] payloads) { this.serviceName = serviceName; this.instanceId = instanceId; - this.creationTime = creationTime; this.globalTxId = globalTxId; this.localTxId = localTxId; this.parentTxId = parentTxId; - this.type = type; this.compensationMethod = compensationMethod; this.payloads = payloads; + this.status = status; } - public String serviceName() { - return serviceName; + Command(String serviceName, + String instanceId, + String globalTxId, + String localTxId, + String parentTxId, + String compensationMethod, + byte[] payloads) { + + this(serviceName, instanceId, globalTxId, localTxId, parentTxId, compensationMethod, payloads, NEW.name()); } - public String instanceId() { - return instanceId; + Command(Command command, CommandStatus status) { + this(command.serviceName, + command.instanceId, + command.globalTxId, + command.localTxId, + command.parentTxId, + command.compensationMethod, + command.payloads, + status.name()); } - public Date creationTime() { - return creationTime; + public Command(TxEvent event) { + this(event.serviceName(), + event.instanceId(), + event.globalTxId(), + event.localTxId(), + event.parentTxId(), + event.compensationMethod(), + event.payloads()); } - public String globalTxId() { + String globalTxId() { return globalTxId; } - public String localTxId() { + String localTxId() { return localTxId; } - public String parentTxId() { - return parentTxId; - } - - public String type() { - return type; - } - - public String compensationMethod() { - return compensationMethod; - } - - public byte[] payloads() { - return payloads; + String status() { + return status; } } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java new file mode 100644 index 0000000..915d476 --- /dev/null +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.core; + +import java.util.List; + +public interface CommandRepository { + boolean exists(String globalTxId, String localTxId); + + void saveCompensationCommand(String globalTxId, String localTxId); + + void saveCompensationCommands(String globalTxId); + + void markCommandAsDone(String globalTxId, String localTxId); + + List<Command> findUncompletedCommands(String globalTxId); +} diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java new file mode 100644 index 0000000..cdf1f6c --- /dev/null +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.core; + +public enum CommandStatus { + NEW, + DONE +} diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java index 5dc5788..560096f 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java @@ -17,7 +17,6 @@ package org.apache.servicecomb.saga.alpha.core; -import static java.util.Collections.emptySet; import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent; import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent; import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent; @@ -26,10 +25,8 @@ import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; import java.util.Date; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Consumer; @@ -40,6 +37,7 @@ public class TxConsistentService { private static final byte[] EMPTY_PAYLOAD = new byte[0]; private final TxEventRepository eventRepository; + private final CommandRepository commandRepository; private final OmegaCallback omegaCallback; private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{ put(TxEndedEvent.name(), (event) -> compensateIfAlreadyAborted(event)); @@ -47,11 +45,13 @@ public class TxConsistentService { put(TxCompensatedEvent.name(), (event) -> updateCompensateStatus(event)); }}; - private final Map<String, Set<String>> eventsToCompensate = new HashMap<>(); private final ExecutorService executor = Executors.newSingleThreadExecutor(); - public TxConsistentService(TxEventRepository eventRepository, OmegaCallback omegaCallback) { + public TxConsistentService(TxEventRepository eventRepository, + CommandRepository commandRepository, + OmegaCallback omegaCallback) { this.eventRepository = eventRepository; + this.commandRepository = commandRepository; this.omegaCallback = omegaCallback; } @@ -68,7 +68,7 @@ public class TxConsistentService { private void compensateIfAlreadyAborted(TxEvent event) { if (!isCompensationScheduled(event) && isGlobalTxAborted(event)) { - eventsToCompensate.computeIfAbsent(event.globalTxId(), k -> new HashSet<>()).add(event.localTxId()); + commandRepository.saveCompensationCommand(event.globalTxId(), event.localTxId()); TxEvent correspondingStartedEvent = eventRepository .findFirstTransaction(event.globalTxId(), event.localTxId(), TxStartedEvent.name()); @@ -77,7 +77,7 @@ public class TxConsistentService { } private boolean isCompensationScheduled(TxEvent event) { - return eventsToCompensate.getOrDefault(event.globalTxId(), emptySet()).contains(event.localTxId()); + return commandRepository.exists(event.globalTxId(), event.localTxId()); } private void compensate(TxEvent event) { @@ -85,8 +85,7 @@ public class TxConsistentService { events.removeIf(this::isCompensationScheduled); - Set<String> localTxIds = eventsToCompensate.computeIfAbsent(event.globalTxId(), k -> new HashSet<>()); - events.forEach(e -> localTxIds.add(e.localTxId())); + commandRepository.saveCompensationCommands(event.globalTxId()); events.forEach(omegaCallback::compensate); } @@ -94,13 +93,10 @@ public class TxConsistentService { // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late, // unless we ask user to specify a name for each participant in the global TX in @Compensable private void updateCompensateStatus(TxEvent event) { - Set<String> events = eventsToCompensate.get(event.globalTxId()); - if (events != null) { - events.remove(event.localTxId()); - if (events.isEmpty()) { - markGlobalTxEnd(event); - eventsToCompensate.remove(event.globalTxId()); - } + commandRepository.markCommandAsDone(event.globalTxId(), event.localTxId()); + if (eventRepository.findTransactions(event.globalTxId(), SagaEndedEvent.name()).isEmpty() + && commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) { + markGlobalTxEnd(event); } } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java index 37a29f1..760dd70 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java @@ -111,4 +111,18 @@ public class TxEvent { public byte[] payloads() { return payloads; } + + @Override + public String toString() { + return "TxEvent{" + + "serviceName='" + serviceName + '\'' + + ", instanceId='" + instanceId + '\'' + + ", creationTime=" + creationTime + + ", globalTxId='" + globalTxId + '\'' + + ", localTxId='" + localTxId + '\'' + + ", parentTxId='" + parentTxId + '\'' + + ", type='" + type + '\'' + + ", compensationMethod='" + compensationMethod + '\'' + + '}'; + } } diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java index 99667e7..8ae60a3 100644 --- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java +++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java @@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.alpha.core; import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE; import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent; import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent; import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent; @@ -34,16 +35,20 @@ import static org.junit.Assert.assertThat; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; +import java.util.Deque; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.stream.Collectors; import org.apache.servicecomb.saga.common.EventType; import org.junit.Test; public class TxConsistentServiceTest { - private final List<TxEvent> events = new ArrayList<>(); + private final Deque<TxEvent> events = new ConcurrentLinkedDeque<>(); private final TxEventRepository eventRepository = new TxEventRepository() { @Override public void save(TxEvent event) { @@ -92,6 +97,51 @@ public class TxConsistentServiceTest { } }; + private final List<Command> commands = new ArrayList<>(); + private final CommandRepository commandRepository = new CommandRepository() { + @Override + public boolean exists(String globalTxId, String localTxId) { + return commands.stream() + .anyMatch(command -> globalTxId.equals(command.globalTxId()) && localTxId.equals(command.localTxId())); + } + + @Override + public void saveCompensationCommand(String globalTxId, String localTxId) { + TxEvent event = eventRepository.findFirstTransaction(globalTxId, localTxId, TxStartedEvent.name()); + commands.add(new Command(event)); + } + + @Override + public void saveCompensationCommands(String globalTxId) { + List<TxEvent> events = eventRepository.findTransactionsToCompensate(globalTxId); + + Map<String, Command> commandMap = new HashMap<>(); + + for (TxEvent event : events) { + commandMap.computeIfAbsent(event.localTxId(), k -> new Command(event)); + } + + commands.addAll(commandMap.values()); + } + + @Override + public void markCommandAsDone(String globalTxId, String localTxId) { + for (int i = 0; i < commands.size(); i++) { + Command command = commands.get(i); + if (globalTxId.equals(command.globalTxId()) && localTxId.equals(command.localTxId())) { + commands.set(i, new Command(command, DONE)); + } + } + } + + @Override + public List<Command> findUncompletedCommands(String globalTxId) { + return commands.stream() + .filter(command -> command.globalTxId().equals(globalTxId) && !DONE.name().equals(command.status())) + .collect(Collectors.toList()); + } + }; + private final String globalTxId = UUID.randomUUID().toString(); private final String localTxId = UUID.randomUUID().toString(); private final String parentTxId = UUID.randomUUID().toString(); @@ -104,7 +154,7 @@ public class TxConsistentServiceTest { private final OmegaCallback omegaCallback = event -> compensationContexts.add(new CompensationContext(event.globalTxId(), event.localTxId(), event.compensationMethod(), event.payloads())); - private final TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback); + private final TxConsistentService consistentService = new TxConsistentService(eventRepository, commandRepository, omegaCallback); @Test public void persistEventOnArrival() throws Exception { @@ -150,7 +200,7 @@ public class TxConsistentServiceTest { consistentService.handle(compensateEvent1); await().atMost(1, SECONDS).until(() -> events.size() == 8); - assertThat(events.get(events.size() - 1).type(), is(SagaEndedEvent.name())); + assertThat(events.pollLast().type(), is(SagaEndedEvent.name())); } @Test diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java index bb4ba89..00dfe27 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java @@ -24,6 +24,7 @@ import java.util.concurrent.LinkedBlockingQueue; import javax.annotation.PostConstruct; +import org.apache.servicecomb.saga.alpha.core.CommandRepository; import org.apache.servicecomb.saga.alpha.core.CompositeOmegaCallback; import org.apache.servicecomb.saga.alpha.core.OmegaCallback; import org.apache.servicecomb.saga.alpha.core.PendingTaskRunner; @@ -59,12 +60,18 @@ class AlphaConfig { } @Bean + CommandRepository springCommandRepository(TxEventEnvelopeRepository eventRepo, CommandEntityRepository commandRepository) { + return new SpringCommandRepository(eventRepo, commandRepository); + } + + @Bean TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port, TxEventRepository eventRepository, + CommandRepository commandRepository, OmegaCallback omegaCallback, Map<String, Map<String, OmegaCallback>> omegaCallbacks) { - TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback); + TxConsistentService consistentService = new TxConsistentService(eventRepository, commandRepository, omegaCallback); ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks); new Thread(startable::start).start(); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java new file mode 100644 index 0000000..3eac681 --- /dev/null +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.server; + +import java.util.Date; + +import javax.persistence.Embedded; +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.Id; +import javax.persistence.Version; + +import org.apache.servicecomb.saga.alpha.core.Command; +import org.apache.servicecomb.saga.alpha.core.TxEvent; + +@Entity +class CommandEntity { + @Id + @GeneratedValue + private long surrogateId; + + @Embedded + private Command command; + + private Date lastModified; + + @Version + private int version; + + CommandEntity() { + } + + CommandEntity(long id, TxEvent event) { + surrogateId = id; + lastModified = new Date(); + command = new Command(event); + } +} diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java new file mode 100644 index 0000000..4b7309e --- /dev/null +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.server; + +import java.util.List; +import java.util.Optional; + +import org.apache.servicecomb.saga.alpha.core.Command; +import org.springframework.data.jpa.repository.Modifying; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.CrudRepository; +import org.springframework.data.repository.query.Param; + +public interface CommandEntityRepository extends CrudRepository<CommandEntity, Long> { + Optional<CommandEntity> findByCommandGlobalTxIdAndCommandLocalTxId(String globalTxId, String localTxId); + + @Modifying + @Query("UPDATE org.apache.servicecomb.saga.alpha.server.CommandEntity c " + + "SET c.command.status = :status " + + "WHERE c.command.globalTxId = :globalTxId " + + "AND c.command.localTxId = :localTxId") + void updateStatusByGlobalTxIdAndLocalTxId( + @Param("status") String status, + @Param("globalTxId") String globalTxId, + @Param("localTxId") String localTxId); + + List<Command> findByCommandGlobalTxIdAndCommandStatus(String globalTxId, String status); +} diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java new file mode 100644 index 0000000..9281b7e --- /dev/null +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.server; + +import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE; +import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW; +import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.servicecomb.saga.alpha.core.Command; +import org.apache.servicecomb.saga.alpha.core.CommandRepository; + +public class SpringCommandRepository implements CommandRepository { + private final TxEventEnvelopeRepository eventRepository; + private final CommandEntityRepository commandRepository; + + SpringCommandRepository(TxEventEnvelopeRepository eventRepository, CommandEntityRepository commandRepository) { + this.eventRepository = eventRepository; + this.commandRepository = commandRepository; + } + + @Override + public boolean exists(String globalTxId, String localTxId) { + return commandRepository.findByCommandGlobalTxIdAndCommandLocalTxId(globalTxId, localTxId).isPresent(); + } + + @Override + public void saveCompensationCommand(String globalTxId, String localTxId) { + TxEventEnvelope startedEvent = eventRepository.findFirstByEventGlobalTxIdAndEventLocalTxIdAndEventType( + globalTxId, + localTxId, + TxStartedEvent.name()); + + commandRepository.save(new CommandEntity(startedEvent.id(), startedEvent.event())); + } + + @Override + public void saveCompensationCommands(String globalTxId) { + List<TxEventEnvelope> events = eventRepository + .findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(globalTxId); + + Map<String, CommandEntity> commands = new HashMap<>(); + + for (TxEventEnvelope event : events) { + commands.computeIfAbsent(event.localTxId(), k -> new CommandEntity(event.id(), event.event())); + } + + commandRepository.save(commands.values()); + } + + @Override + public void markCommandAsDone(String globalTxId, String localTxId) { + commandRepository.updateStatusByGlobalTxIdAndLocalTxId(DONE.name(), globalTxId, localTxId); + } + + @Override + public List<Command> findUncompletedCommands(String globalTxId) { + return commandRepository.findByCommandGlobalTxIdAndCommandStatus(globalTxId, NEW.name()); + } +} diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java index bdf82f1..fcb7c00 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java @@ -52,4 +52,17 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> { + " AND t2.type = 'TxCompensatedEvent')" ) List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId); + + @Query("FROM TxEventEnvelope t " + + "WHERE t.event.globalTxId = ?1 AND t.event.type = 'TxStartedEvent' AND EXISTS ( " + + " FROM TxEventEnvelope t1 " + + " WHERE t1.event.globalTxId = ?1 " + + " AND t1.event.localTxId = t.event.localTxId " + + " AND t1.event.type = 'TxEndedEvent'" + + ") AND NOT EXISTS ( " + + " FROM TxEventEnvelope t2 " + + " WHERE t2.event.globalTxId = ?1 " + + " AND t2.event.localTxId = t.event.localTxId " + + " AND t2.event.type = 'TxCompensatedEvent')") + List<TxEventEnvelope> findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(String globalTxId); } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java index cead07a..5a61dc7 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java @@ -68,6 +68,7 @@ public class TransactionAspect { } LOG.debug("Updated context {} for compensable method {} ", context, method.toString()); + // TODO: 2018/1/15 omega shall be stateless, all states shall be on alpha scheduleTimeoutTask(interceptor, localTxId, signature, method, compensable.timeout()); try { -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
