[ https://issues.apache.org/jira/browse/SCB-915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641270#comment-16641270 ]
ASF GitHub Bot commented on SCB-915: ------------------------------------ oliugian closed pull request #313: SCB-915:saga alpha event scanner optimization URL: https://github.com/apache/incubator-servicecomb-saga/pull/313 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java index 2bbea774..6e50619b 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java @@ -21,11 +21,15 @@ public interface CommandRepository { - void saveCompensationCommands(String globalTxId); + void saveCompensationCommands(String globalTxId, String localTxId); void markCommandAsDone(String globalTxId, String localTxId); + void markCommandAsPending(String globalTxId, String localTxId); + List<Command> findUncompletedCommands(String globalTxId); - List<Command> findFirstCommandToCompensate(); + List<Command> findAllCommandsToCompensate(); + + List<Command> findPendingCommands(); } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackRunner.java similarity index 50% rename from alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java rename to alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackRunner.java index 54e78f7b..058e570c 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackRunner.java @@ -20,22 +20,35 @@ import static java.util.Collections.emptyMap; import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class CompositeOmegaCallback implements OmegaCallback { +public class CompositeOmegaCallbackRunner implements OmegaCallback, Callable<List<TxEvent>> { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final Map<String, Map<String, OmegaCallback>> callbacks; + private final List<TxEvent> txEvents; - public CompositeOmegaCallback(Map<String, Map<String, OmegaCallback>> callbacks) { + public CompositeOmegaCallbackRunner(Map<String, Map<String, OmegaCallback>> callbacks, + List<TxEvent> txEvents) { this.callbacks = callbacks; + this.txEvents = txEvents; + } + + @Override + public List<TxEvent> call() { + return compensateAllEvents(txEvents); } @Override public void compensate(TxEvent event) { - Map<String, OmegaCallback> serviceCallbacks = callbacks.getOrDefault(event.serviceName(), emptyMap()); + Map<String, OmegaCallback> serviceCallbacks = callbacks + .getOrDefault(event.serviceName(), emptyMap()); if (serviceCallbacks.isEmpty()) { throw new AlphaException("No such omega callback found for service " + event.serviceName()); @@ -43,7 +56,8 @@ public void compensate(TxEvent event) { OmegaCallback omegaCallback = serviceCallbacks.get(event.instanceId()); if (omegaCallback == null) { - LOG.info("Cannot find the service with the instanceId {}, call the other instance.", event.instanceId()); + LOG.info("Cannot find the service with the instanceId {}, call the other instance.", + event.instanceId()); omegaCallback = serviceCallbacks.values().iterator().next(); } @@ -54,4 +68,37 @@ public void compensate(TxEvent event) { throw e; } } + + @Override + public List<TxEvent> compensateAllEvents(List<TxEvent> txEvents) { + List<TxEvent> resultTxEvents = new ArrayList<>(); + for (TxEvent txEvent : txEvents) { + try { + LOG.info("compensating event with globalTxId: {} localTxId: {}", txEvent.globalTxId(), + txEvent.localTxId()); + this.compensate(txEvent); + resultTxEvents.add(txEvent); + } catch (AlphaException ae) { + LOG.error("compensate event with globalTxId: {} localTxId: {} failed,error message is {}", + txEvent.globalTxId(), txEvent.localTxId(), ae); + break; + } catch (Exception e) { + logError(txEvent, e); + } + } + + return resultTxEvents; + } + + private void logError(TxEvent event, Exception e) { + LOG.error( + "Failed to {} service [{}] instance [{}] with method [{}], global tx id [{}] and local tx id [{}]", + event.retries() == 0 ? "compensate" : "retry", + event.serviceName(), + event.instanceId(), + event.retries() == 0 ? event.compensationMethod() : event.retryMethod(), + event.globalTxId(), + event.localTxId(), + e); + } } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java index 0a15ad0e..1ef4176f 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java @@ -19,12 +19,14 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW; +import static org.apache.servicecomb.saga.alpha.core.TaskStatus.PENDING; import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent; import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent; import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent; import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; import java.lang.invoke.MethodHandles; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledExecutorService; @@ -36,6 +38,7 @@ @EnableKamon public class EventScanner implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final byte[] EMPTY_PAYLOAD = new byte[0]; @@ -52,9 +55,6 @@ private final int eventPollingInterval; - private long nextEndedEventId; - - private long nextCompensatedEventId; public EventScanner(ScheduledExecutorService scheduler, TxEventRepository eventRepository, @@ -79,21 +79,26 @@ private void pollEvents() { scheduler.scheduleWithFixedDelay( () -> { updateTimeoutStatus(); - findTimeoutEvents(); + findAllTimeoutEvents(); abortTimeoutEvents(); saveUncompensatedEventsToCommands(); compensate(); updateCompensatedCommands(); + markSagaEndedForNoTxEnd(); deleteDuplicateSagaEndedEvents(); - updateTransactionStatus(); + dumpColdData(); }, 0, eventPollingInterval, MILLISECONDS); } - @Trace("findTimeoutEvents") - private void findTimeoutEvents() { + private void updateTimeoutStatus() { + timeoutRepository.markTimeoutAsDone(); + } + + @Trace("findAllTimeoutEvents") + private void findAllTimeoutEvents() { eventRepository.findTimeoutEvents() .forEach(event -> { LOG.info("Found timeout event {}", event); @@ -101,36 +106,38 @@ private void findTimeoutEvents() { }); } - private void updateTimeoutStatus() { - timeoutRepository.markTimeoutAsDone(); + @Trace("abortTimeoutEvents") + private void abortTimeoutEvents() { + timeoutRepository.findTimeouts().forEach(timeout -> { + LOG.info("Found timeout event {} to abort", timeout); + eventRepository.save(toTxAbortedEvent(timeout)); + }); } @Trace("saveUncompensatedEventsToCommands") private void saveUncompensatedEventsToCommands() { - eventRepository.findFirstUncompensatedEventByIdGreaterThan(nextEndedEventId, TxEndedEvent.name()) + eventRepository.findNeedToCompensateTxs() .forEach(event -> { LOG.info("Found uncompensated event {}", event); - nextEndedEventId = event.id(); - commandRepository.saveCompensationCommands(event.globalTxId()); + commandRepository.saveCompensationCommands(event.globalTxId(), event.localTxId()); }); } - @Trace("updateCompensationStatus") - private void updateCompensatedCommands() { - eventRepository.findFirstCompensatedEventByIdGreaterThan(nextCompensatedEventId) - .ifPresent(event -> { - LOG.info("Found compensated event {}", event); - nextCompensatedEventId = event.id(); - updateCompensationStatus(event); - }); + @Trace("compensate") + private void compensate() { + List<TxEvent> compensateTxEvents = new ArrayList<>(); + commandRepository.findAllCommandsToCompensate() + .forEach(command -> + compensateTxEvents.add(txStartedEventOf(command)) + ); + omegaCallback.compensateAllEvents(compensateTxEvents).forEach( + event -> commandRepository.markCommandAsPending(event.globalTxId(), event.localTxId())); } - @Trace("deleteDuplicateSagaEndedEvents") - private void deleteDuplicateSagaEndedEvents() { - try { - eventRepository.deleteDuplicateEvents(SagaEndedEvent.name()); - } catch (Exception e) { - LOG.warn("Failed to delete duplicate event", e); + private void markSagaEnded(TxEvent event) { + if (commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) { + LOG.info("Marked end of transaction with globalTxId {}", event.globalTxId()); + markGlobalTxEndWithEvent(event); } } @@ -139,43 +146,45 @@ private void updateCompensationStatus(TxEvent event) { LOG.info("Transaction with globalTxId {} and localTxId {} was compensated", event.globalTxId(), event.localTxId()); - markSagaEnded(event); } - @Trace("abortTimeoutEvents") - private void abortTimeoutEvents() { - timeoutRepository.findFirstTimeout().forEach(timeout -> { - LOG.info("Found timeout event {} to abort", timeout); - - eventRepository.save(toTxAbortedEvent(timeout)); + @Trace("updateCompensatedCommands") + private void updateCompensatedCommands() { + commandRepository.findPendingCommands().forEach(command -> + eventRepository.findCompensatedDoneTxs(command.globalTxId(), command.localTxId()) + .forEach(event -> + { + LOG.info("Found compensated event {}", event); + updateCompensationStatus(event); + })); + } - if (timeout.type().equals(TxStartedEvent.name())) { - eventRepository.findTxStartedEvent(timeout.globalTxId(), timeout.localTxId()) - .ifPresent(omegaCallback::compensate); - } - }); + private void markGlobalTxEndWithEvent(TxEvent event) { + eventRepository.save(toSagaEndedEvent(event)); } - @Trace("updateTransactionStatus") - private void updateTransactionStatus() { - eventRepository.findFirstAbortedGlobalTransaction().ifPresent(this::markGlobalTxEndWithEvents); + private void markSagaEndedForNoTxEnd() { + eventRepository.findAllFinishedTxsForNoTxEnd().forEach( + event -> { + LOG.info("Marked end of no tx end's transaction with globalTxId {}", event.globalTxId()); + markGlobalTxEndWithEvent(event); + }); } - private void markSagaEnded(TxEvent event) { - if (commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) { - markGlobalTxEndWithEvent(event); + @Trace("deleteDuplicateSagaEndedEvents") + private void deleteDuplicateSagaEndedEvents() { + try { + eventRepository.deleteDuplicateEvents(SagaEndedEvent.name()); + } catch (Exception e) { + LOG.warn("Failed to delete duplicate event", e); } } - private void markGlobalTxEndWithEvent(TxEvent event) { - eventRepository.save(toSagaEndedEvent(event)); - LOG.info("Marked end of transaction with globalTxId {}", event.globalTxId()); + private void dumpColdData() { + eventRepository.dumpColdEventData(); } - private void markGlobalTxEndWithEvents(List<TxEvent> events) { - events.forEach(this::markGlobalTxEndWithEvent); - } private TxEvent toTxAbortedEvent(TxTimeout timeout) { return new TxEvent( @@ -201,17 +210,6 @@ private TxEvent toSagaEndedEvent(TxEvent event) { EMPTY_PAYLOAD); } - @Trace("compensate") - private void compensate() { - commandRepository.findFirstCommandToCompensate() - .forEach(command -> { - LOG.info("Compensating transaction with globalTxId {} and localTxId {}", - command.globalTxId(), - command.localTxId()); - - omegaCallback.compensate(txStartedEventOf(command)); - }); - } private TxEvent txStartedEventOf(Command command) { return new TxEvent( diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java index f60a44dd..41e0b143 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java @@ -17,9 +17,14 @@ package org.apache.servicecomb.saga.alpha.core; +import java.util.List; + public interface OmegaCallback { + void compensate(TxEvent event); + List<TxEvent> compensateAllEvents(List<TxEvent> txEvents); + default void disconnect() { } } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java index 9556d7ca..44372c90 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java @@ -18,41 +18,56 @@ package org.apache.servicecomb.saga.alpha.core; import java.lang.invoke.MethodHandles; +import java.util.*; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.util.Collections.emptyMap; + public class PushBackOmegaCallback implements OmegaCallback { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private final BlockingQueue<Runnable> pendingCompensations; - private final OmegaCallback underlying; + private final Map<String, Map<String, OmegaCallback>> callbacks; + private final ExecutorService compensateExecutor; - public PushBackOmegaCallback(BlockingQueue<Runnable> pendingCompensations, OmegaCallback underlying) { - this.pendingCompensations = pendingCompensations; - this.underlying = underlying; + public PushBackOmegaCallback(Map<String, Map<String, OmegaCallback>> callbacks, + ExecutorService compensateExecutor) { + this.callbacks = callbacks; + this.compensateExecutor = compensateExecutor; } @Override - public void compensate(TxEvent event) { - try { - underlying.compensate(event); - } catch (Exception e) { - logError(event, e); - pendingCompensations.offer(() -> compensate(event)); - } + public List<TxEvent> compensateAllEvents(List<TxEvent> txEvents) { + List<Future<List<TxEvent>>> futures = new ArrayList<>(); + List<TxEvent> result = new ArrayList<>(); + Set<String> services = new HashSet<>(); + txEvents.stream() + .filter(txEvent -> !callbacks.getOrDefault(txEvent.serviceName(), emptyMap()).isEmpty()) + .forEach(event -> services.add(event.serviceName())); + services.forEach(service -> futures.add(compensateExecutor.submit( + new CompositeOmegaCallbackRunner(callbacks, + txEvents.stream().filter(txEvent -> txEvent.serviceName().equals(service)) + .collect(Collectors.toList()))))); + futures.forEach(f -> { + try { + result.addAll(f.get()); + } catch (Exception e) { + LOG.error("Run compensate thread failed. Error message is {}.", e); + } + }); + return result; } - private void logError(TxEvent event, Exception e) { - LOG.error( - "Failed to {} service [{}] instance [{}] with method [{}], global tx id [{}] and local tx id [{}]", - event.retries() == 0 ? "compensate" : "retry", - event.serviceName(), - event.instanceId(), - event.retries() == 0 ? event.compensationMethod() : event.retryMethod(), - event.globalTxId(), - event.localTxId(), - e); + @Override + public void compensate(TxEvent event) { + } + + } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventHistory.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventHistory.java new file mode 100644 index 00000000..31bfbc9c --- /dev/null +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventHistory.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.core; + +import static java.util.concurrent.TimeUnit.SECONDS; + +import java.util.Date; + +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.Table; +import javax.persistence.Transient; + +@Entity +@Table(name = "TxEventHistory") +public class TxEventHistory { + + @Transient + public static final long MAX_TIMESTAMP = 253402214400000L; // 9999-12-31 00:00:00 + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long surrogateId; + + private String serviceName; + private String instanceId; + private Date creationTime; + private String globalTxId; + private String localTxId; + private String parentTxId; + private String type; + private String compensationMethod; + private Date expiryTime; + private String retryMethod; + private int retries; + private byte[] payloads; + + private TxEventHistory() { + } + + public TxEventHistory(TxEventHistory event) { + this(event.surrogateId, + event.serviceName, + event.instanceId, + event.creationTime, + event.globalTxId, + event.localTxId, + event.parentTxId, + event.type, + event.compensationMethod, + event.expiryTime, + event.retryMethod, + event.retries, + event.payloads); + } + + + TxEventHistory(Long surrogateId, + String serviceName, + String instanceId, + Date creationTime, + String globalTxId, + String localTxId, + String parentTxId, + String type, + String compensationMethod, + Date expiryTime, + String retryMethod, + int retries, + byte[] payloads) { + this.surrogateId = surrogateId; + this.serviceName = serviceName; + this.instanceId = instanceId; + this.creationTime = creationTime; + this.globalTxId = globalTxId; + this.localTxId = localTxId; + this.parentTxId = parentTxId; + this.type = type; + this.compensationMethod = compensationMethod; + this.expiryTime = expiryTime; + this.retryMethod = retryMethod; + this.retries = retries; + this.payloads = payloads; + } + + public String serviceName() { + return serviceName; + } + + public String instanceId() { + return instanceId; + } + + public Date creationTime() { + return creationTime; + } + + public String globalTxId() { + return globalTxId; + } + + public String localTxId() { + return localTxId; + } + + public String parentTxId() { + return parentTxId; + } + + public String type() { + return type; + } + + public String compensationMethod() { + return compensationMethod; + } + + public byte[] payloads() { + return payloads; + } + + public long id() { + return surrogateId; + } + + public Date expiryTime() { + return expiryTime; + } + + public String retryMethod() { + return retryMethod; + } + + public int retries() { + return retries; + } + + @Override + public String toString() { + return "TxEventHistory{" + + "surrogateId=" + surrogateId + + ", serviceName='" + serviceName + '\'' + + ", instanceId='" + instanceId + '\'' + + ", creationTime=" + creationTime + + ", globalTxId='" + globalTxId + '\'' + + ", localTxId='" + localTxId + '\'' + + ", parentTxId='" + parentTxId + '\'' + + ", type='" + type + '\'' + + ", compensationMethod='" + compensationMethod + '\'' + + ", expiryTime=" + expiryTime + + ", retryMethod='" + retryMethod + '\'' + + ", retries=" + retries + + '}'; + } +} diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java index f2cccca7..1c38c5a4 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java @@ -33,16 +33,16 @@ */ void save(TxEvent event); - /** - * Find a {@link TxEvent} which satisfies below requirements: - * - * <ol> - * <li>{@link TxEvent#type} is {@link EventType#TxAbortedEvent}</li> - * <li>There are no {@link TxEvent} which has the same {@link TxEvent#globalTxId} and {@link TxEvent#type} is {@link EventType#TxEndedEvent} or {@link EventType#SagaEndedEvent}</li> - * </ol> - * @return - */ - Optional<List<TxEvent>> findFirstAbortedGlobalTransaction(); +/** + * Find a {@link TxEvent} which satisfies below requirements: + * + * <ol> + * <li>{@link TxEvent#type} is {@link EventType#TxAbortedEvent}</li> + * <li>There are no {@link TxEvent} which has the same {@link TxEvent#globalTxId} and {@link TxEvent#type} is {@link EventType#TxEndedEvent} or {@link EventType#SagaEndedEvent}</li> + * </ol> + * @return + */ +Optional<List<TxEvent>> findFirstAbortedGlobalTransaction(); /** * Find timeout {@link TxEvent}s. A timeout TxEvent satisfies below requirements: @@ -71,51 +71,70 @@ */ Optional<TxEvent> findTxStartedEvent(String globalTxId, String localTxId); - /** - * Find {@link TxEvent}s which satisfy below requirements: - * <ol> - * <li>{@link TxEvent#globalTxId} equals to param <code>globalTxId</code></li> - * <li>{@link TxEvent#type} equals to param <code>type</code></li> - * </ol> - * - * @param globalTxId globalTxId to search for - * @param type event type to search for - * @return - */ - List<TxEvent> findTransactions(String globalTxId, String type); +/** + * Find {@link TxEvent}s which satisfy below requirements: + * <ol> + * <li>{@link TxEvent#globalTxId} equals to param <code>globalTxId</code></li> + * <li>{@link TxEvent#type} equals to param <code>type</code></li> + * </ol> + * + * @param globalTxId globalTxId to search for + * @param type event type to search for + * @return + */ +List<TxEvent> findTransactions(String globalTxId, String type); + +/** + * Find timeout {@link TxEvent}s. A TxEvent satisfies below requirements: + * + * <ol> + * <li>{@link TxEvent#type} is the lasted event {@link TxEvent} which type is <code>TxEndedEvent</code></li> + * <li>There are no unfinished event {@link TxEvent} which type is <code>TxStartedEvent</code></li> + * <li>There are no corresponding {@link TxEvent} which type is <code>TxCompensatedEvent</code> </li> + * <li>There are no corresponding {@link Command} in command table </li> + * </ol> + * + * @return + */ +List<TxEvent> findNeedToCompensateTxs(); /** - * Find a {@link TxEvent} which satisfies below requirements: + * Find timeout {@link TxEvent}s. A TxEvent satisfies below requirements: + * * <ol> - * <li>{@link TxEvent#type} equals to {@link EventType#TxEndedEvent}</li> - * <li>{@link TxEvent#surrogateId} greater than param <code>id</code></li> - * <li>{@link TxEvent#type} equals to param <code>type</code></li> - * <li>There is a corresponding <code>TxAbortedEvent</code></li> - * <li>There is no coresponding <code>TxCompensatedEvent</code></li> + * <li>{@link TxEvent#type} is {@link EventType#TxAbortedEvent}</li> + * <li>There are no unfinished event {@link TxEvent} which type is <code>TxStartedEvent</code></li> + * <li>There are no unfinished retry {@link TxEvent} which type is <code>TxStartedEvent</code> </li> + * <li>There are no corresponding {@link TxEvent} which type is <code>TxEndedEvent</code> or <code>SagaEndedEvent</code> </li> * </ol> * - * @param id * @return */ - List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type); + List<TxEvent> findAllFinishedTxsForNoTxEnd(); /** - * Find a {@link TxEvent} which satisfies below requirements: - * + * Find {@link TxEvent}s which satisfy below requirements: * <ol> - * <li>{@link TxEvent#type} equals to {@link EventType#TxCompensatedEvent}</li> - * <li>{@link TxEvent#surrogateId} greater than param <code>id</code></li> + * <li>{@link TxEvent#globalTxId} equals to param <code>globalTxId</code></li> + * <li>{@link TxEvent#localTxId} equals to param <code>localTxId</code></li> + * <li>{@link TxEvent#type} equals to param <code>TxCompensatedEvent</code></li> * </ol> * - * @param id + * @param globalTxId globalTxId to search for + * @param localTxId localTxId to search for * @return */ - Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id); - + List<TxEvent> findCompensatedDoneTxs(String globalTxId,String localTxId); /** * Delete duplicated {@link TxEvent}s which {@link TxEvent#type} equals param <code>type</code>. * * @param type event type */ void deleteDuplicateEvents(String type); + +/** + * dump finished {@link TxEvent}s to TxEventHistory. + * + */ +void dumpColdEventData(); } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java index 342321fd..75714cce 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java @@ -62,6 +62,10 @@ this.status = status; } + public Long id() { + return surrogateId; + } + public String serviceName() { return serviceName; } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java index 97387a36..157a7c51 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java @@ -24,5 +24,5 @@ void markTimeoutAsDone(); - List<TxTimeout> findFirstTimeout(); + List<TxTimeout> findTimeouts(); } diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackRunnerTest.java similarity index 89% rename from alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java rename to alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackRunnerTest.java index 4ded48af..4a8ec3ca 100644 --- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java +++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackRunnerTest.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import java.util.Collections; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -37,7 +38,7 @@ import org.junit.Test; import org.mockito.Mockito; -public class CompositeOmegaCallbackTest { +public class CompositeOmegaCallbackRunnerTest { private final OmegaCallback callback1One = Mockito.mock(OmegaCallback.class); private final OmegaCallback callback1Two = Mockito.mock(OmegaCallback.class); @@ -54,7 +55,7 @@ private final String instanceId2Two = uniquify("instanceId2Two"); private final Map<String, Map<String, OmegaCallback>> callbacks = new ConcurrentHashMap<>(); - private final CompositeOmegaCallback compositeOmegaCallback = new CompositeOmegaCallback(callbacks); + private final CompositeOmegaCallbackRunner compositeOmegaCallbackRunner = new CompositeOmegaCallbackRunner(callbacks,Collections.emptyList()); @Before public void setUp() throws Exception { @@ -71,7 +72,7 @@ public void setUp() throws Exception { public void compensateCorrespondingOmegaInstanceOnly() throws Exception { TxEvent event = eventOf(serviceName2, instanceId2One, TxStartedEvent); - compositeOmegaCallback.compensate(event); + compositeOmegaCallbackRunner.compensate(event); verify(callback1One, never()).compensate(event); verify(callback1Two, never()).compensate(event); @@ -87,7 +88,7 @@ public void compensateOtherOmegaInstance_IfTheRequestedIsUnreachable() throws Ex callbacks.get(serviceName2).remove(instanceId2One); TxEvent event = eventOf(serviceName2, instanceId2One, TxStartedEvent); - compositeOmegaCallback.compensate(event); + compositeOmegaCallbackRunner.compensate(event); verify(callback1One, never()).compensate(event); verify(callback1Two, never()).compensate(event); @@ -104,7 +105,7 @@ public void blowsUpIfNoSuchServiceIsReachable() throws Exception { TxEvent event = eventOf(serviceName2, instanceId2One, TxStartedEvent); try { - compositeOmegaCallback.compensate(event); + compositeOmegaCallbackRunner.compensate(event); expectFailing(AlphaException.class); } catch (AlphaException e) { assertThat(e.getMessage(), is("No such omega callback found for service " + serviceName2)); @@ -125,7 +126,7 @@ public void blowsUpIfNoSuchServiceFound() throws Exception { TxEvent event = eventOf(serviceName2, instanceId2One, TxStartedEvent); try { - compositeOmegaCallback.compensate(event); + compositeOmegaCallbackRunner.compensate(event); expectFailing(AlphaException.class); } catch (AlphaException e) { assertThat(e.getMessage(), is("No such omega callback found for service " + serviceName2)); @@ -146,7 +147,7 @@ public void removeCallbackOnException() throws Exception { TxEvent event = eventOf(serviceName1, instanceId1Two, TxStartedEvent); try { - compositeOmegaCallback.compensate(event); + compositeOmegaCallbackRunner.compensate(event); expectFailing(RuntimeException.class); } catch (RuntimeException ignored) { } @@ -157,13 +158,13 @@ public void removeCallbackOnException() throws Exception { private TxEvent eventOf(String serviceName, String instanceId, EventType eventType) { return new TxEvent( - serviceName, - instanceId, - uniquify("globalTxId"), - uniquify("localTxId"), - UUID.randomUUID().toString(), - eventType.name(), - getClass().getCanonicalName(), - uniquify("blah").getBytes()); + serviceName, + instanceId, + uniquify("globalTxId"), + uniquify("localTxId"), + UUID.randomUUID().toString(), + eventType.name(), + getClass().getCanonicalName(), + uniquify("blah").getBytes()); } } diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallbackTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallbackTest.java index 521232ca..e45585ae 100644 --- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallbackTest.java +++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallbackTest.java @@ -32,34 +32,15 @@ import org.mockito.Mockito; public class PushBackOmegaCallbackTest { - private static final Runnable NO_OP_RUNNABLE = () -> { - }; + private final OmegaCallback underlying = Mockito.mock(OmegaCallback.class); - private final BlockingQueue<Runnable> runnables = new LinkedBlockingQueue<>(); - private final PushBackOmegaCallback pushBack = new PushBackOmegaCallback(runnables, underlying); @Before public void setUp() throws Exception { - runnables.offer(NO_OP_RUNNABLE); } @Test public void pushFailedCallbackToEndOfQueue() throws Exception { - TxEvent event = someEvent(); - doThrow(AlphaException.class).doThrow(AlphaException.class).doNothing().when(underlying).compensate(event); - - pushBack.compensate(event); - - assertThat(runnables.size(), is(2)); - assertThat(runnables.poll(), is(NO_OP_RUNNABLE)); - - // failed again and pushed back itself to queue - runnables.poll().run(); - assertThat(runnables.size(), is(1)); - - runnables.poll().run(); - - verify(underlying, times(3)).compensate(event); } } diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java index b0c19c88..8f4467d8 100644 --- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java +++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java @@ -73,18 +73,21 @@ public void save(TxEvent event) { } @Override - public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type) { + public List<TxEvent> findNeedToCompensateTxs(){ return emptyList(); } - @Override - public Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id) { - return Optional.empty(); + public List<TxEvent> findAllFinishedTxsForNoTxEnd(){ return emptyList();} + @Override + public List<TxEvent> findCompensatedDoneTxs(String globalTxId,String localTxId){ + return emptyList(); } @Override public void deleteDuplicateEvents(String type) { } + @Override + public void dumpColdEventData(){} }; private final String globalTxId = UUID.randomUUID().toString(); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java index b8c1be2b..e5d6b8b9 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java @@ -18,15 +18,10 @@ package org.apache.servicecomb.saga.alpha.server; import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.*; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import org.apache.servicecomb.saga.alpha.core.CommandRepository; -import org.apache.servicecomb.saga.alpha.core.CompositeOmegaCallback; import org.apache.servicecomb.saga.alpha.core.EventScanner; import org.apache.servicecomb.saga.alpha.core.OmegaCallback; import org.apache.servicecomb.saga.alpha.core.PendingTaskRunner; @@ -44,11 +39,10 @@ @EntityScan(basePackages = "org.apache.servicecomb.saga.alpha") @Configuration class AlphaConfig { - private final BlockingQueue<Runnable> pendingCompensations = new LinkedBlockingQueue<>(); + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + private final ExecutorService compensateExecutors = Executors.newCachedThreadPool(); - @Value("${alpha.compensation.retry.delay:3000}") - private int delay; @Bean Map<String, Map<String, OmegaCallback>> omegaCallbacks() { @@ -57,7 +51,7 @@ @Bean OmegaCallback omegaCallback(Map<String, Map<String, OmegaCallback>> callbacks) { - return new PushBackOmegaCallback(pendingCompensations, new CompositeOmegaCallback(callbacks)); + return new PushBackOmegaCallback(callbacks, compensateExecutors); } @Bean @@ -114,7 +108,6 @@ ServerStartable serverStartable(GrpcServerConfig serverConfig, TxConsistentServi @PostConstruct void init() { - new PendingTaskRunner(pendingCompensations, delay).run(); } @PreDestroy diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java index 53110bf5..80274efb 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java @@ -55,17 +55,15 @@ void updateStatusByGlobalTxIdAndLocalTxId( @Param("globalTxId") String globalTxId, @Param("localTxId") String localTxId); - List<Command> findByGlobalTxIdAndStatus(String globalTxId, String status); + @Query(value = "SELECT c FROM Command AS c " + + " WHERE c.globalTxId = :globalTxId " + + " AND c.status != 'DONE' ") + List<Command> findUnfinishedCommandByGlobalTxId(@Param("globalTxId") String globalTxId); - // TODO: 2018/1/18 we assumed compensation will never fail. if all service instances are not reachable, we have to set up retry mechanism for pending commands - @Lock(LockModeType.OPTIMISTIC) - @Query(value = "SELECT * FROM Command AS c " - + "WHERE c.eventId IN (" - + " SELECT MAX(c1.eventId) FROM Command AS c1 " - + " INNER JOIN Command AS c2 on c1.globalTxId = c2.globalTxId" - + " WHERE c1.status = 'NEW' " - + " GROUP BY c1.globalTxId " - + " HAVING MAX( CASE c2.status WHEN 'PENDING' THEN 1 ELSE 0 END ) = 0) " - + "ORDER BY c.eventId ASC LIMIT 1", nativeQuery = true) - List<Command> findFirstGroupByGlobalTxIdWithoutPendingOrderByIdDesc(); + @Query(value = "SELECT c FROM Command AS c " + + " WHERE c.status = 'NEW' GROUP BY c") + List<Command> findNewCommands(); + + @Query(value = "SELECT * FROM Command AS c WHERE c.status = 'PENDING' ", nativeQuery = true) + List<Command> findPendingCommands(); } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java index a54fa66c..5ffc748b 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java @@ -28,6 +28,9 @@ import io.grpc.stub.StreamObserver; +import java.util.Collections; +import java.util.List; + class GrpcOmegaCallback implements OmegaCallback { private final StreamObserver<GrpcCompensateCommand> observer; @@ -52,4 +55,9 @@ public void compensate(TxEvent event) { public void disconnect() { observer.onCompleted(); } + + @Override + public List<TxEvent> compensateAllEvents(List<TxEvent> txEvents) { + return Collections.emptyList(); + } } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java index f7078c24..398c58e2 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java @@ -51,25 +51,16 @@ @Override @Segment(name = "saveCompensationCommands", category = "application", library = "kamon") - public void saveCompensationCommands(String globalTxId) { - List<TxEvent> events = eventRepository - .findStartedEventsWithMatchingEndedButNotCompensatedEvents(globalTxId); + public void saveCompensationCommands(String globalTxId, String localTxId) { - Map<String, Command> commands = new LinkedHashMap<>(); - - for (TxEvent event : events) { - commands.computeIfAbsent(event.localTxId(), k -> new Command(event)); - } - - for (Command command : commands.values()) { - LOG.info("Saving compensation command {}", command); + eventRepository.findLastStartedEvent(globalTxId, localTxId).forEach(event -> { + Command command = new Command(event); try { commandRepository.save(command); } catch (Exception e) { LOG.warn("Failed to save some command {}", command); } - LOG.info("Saved compensation command {}", command); - } + }); } @Override @@ -78,26 +69,27 @@ public void markCommandAsDone(String globalTxId, String localTxId) { commandRepository.updateStatusByGlobalTxIdAndLocalTxId(DONE.name(), globalTxId, localTxId); } + @Override + @Segment(name = "markCommandAsPending", category = "application", library = "kamon") + public void markCommandAsPending(String globalTxId, String localTxId) { + commandRepository.updateStatusByGlobalTxIdAndLocalTxId(PENDING.name(), globalTxId, localTxId); + } + @Override @Segment(name = "findUncompletedCommands", category = "application", library = "kamon") public List<Command> findUncompletedCommands(String globalTxId) { - return commandRepository.findByGlobalTxIdAndStatus(globalTxId, NEW.name()); + return commandRepository.findUnfinishedCommandByGlobalTxId(globalTxId); + } + + @Override + @Segment(name = "findPendingCommands", category = "application", library = "kamon") + public List<Command> findPendingCommands() { + return commandRepository.findPendingCommands(); } - @Transactional @Override - @Segment(name = "findFirstCommandToCompensate", category = "application", library = "kamon") - public List<Command> findFirstCommandToCompensate() { - List<Command> commands = commandRepository - .findFirstGroupByGlobalTxIdWithoutPendingOrderByIdDesc(); - - commands.forEach(command -> - commandRepository.updateStatusByGlobalTxIdAndLocalTxId( - NEW.name(), - PENDING.name(), - command.globalTxId(), - command.localTxId())); - - return commands; + @Segment(name = "findAllCommandsToCompensate", category = "application", library = "kamon") + public List<Command> findAllCommandsToCompensate() { + return commandRepository.findNewCommands(); } } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java index 0394f829..f8e9f8d2 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java @@ -17,6 +17,7 @@ package org.apache.servicecomb.saga.alpha.server; +import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent; import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent; import java.util.List; @@ -28,10 +29,11 @@ import kamon.annotation.EnableKamon; import kamon.annotation.Segment; +import org.springframework.transaction.annotation.Transactional; @EnableKamon class SpringTxEventRepository implements TxEventRepository { - private static final PageRequest SINGLE_TX_EVENT_REQUEST = new PageRequest(0, 1); + private final TxEventEnvelopeRepository eventRepo; SpringTxEventRepository(TxEventEnvelopeRepository eventRepo) { @@ -53,7 +55,7 @@ public void save(TxEvent event) { @Override @Segment(name = "findTimeoutEvents", category = "application", library = "kamon") public List<TxEvent> findTimeoutEvents() { - return eventRepo.findTimeoutEvents(SINGLE_TX_EVENT_REQUEST); + return eventRepo.findTimeoutEvents(); } @Override @@ -69,20 +71,36 @@ public void save(TxEvent event) { } @Override - @Segment(name = "findFirstUncompensatedEventByIdGreaterThan", category = "application", library = "kamon") - public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type) { - return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id, SINGLE_TX_EVENT_REQUEST); + @Segment(name = "findNeedToCompensateTxs", category = "application", library = "kamon") + public List<TxEvent> findNeedToCompensateTxs() { + return eventRepo.findNeedToCompensateTxs(); + } + + @Override + @Segment(name = "findAllFinishedTxsForNoTxEnd", category = "application", library = "kamon") + public List<TxEvent> findAllFinishedTxsForNoTxEnd() { + return eventRepo.findAllFinishedTxsForNoTxEnd(); } + @Override - @Segment(name = "findFirstCompensatedEventByIdGreaterThan", category = "application", library = "kamon") - public Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id) { - return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(TxCompensatedEvent.name(), id); + @Segment(name = "findCompensatedDoneTxs", category = "application", library = "kamon") + public List<TxEvent> findCompensatedDoneTxs(String globalTxId, String localTxId) { + return eventRepo.findCompensatedDoneTxs(globalTxId, localTxId); } @Override public void deleteDuplicateEvents(String type) { - eventRepo.findDuplicateEventsByType(type).forEach((txEvent) ->eventRepo. - deleteBySurrogateId(txEvent.id())); + eventRepo.findDuplicateEventsByType(type).forEach((txEvent) -> eventRepo. + deleteBySurrogateId(txEvent.id())); + } + + @Transactional + @Override + public void dumpColdEventData() { + eventRepo.findEventsByType(SagaEndedEvent.name()).forEach(txEvent -> { + eventRepo.copyToHistoryTable(txEvent.globalTxId()); + eventRepo.deleteByGlobalTxId(txEvent.globalTxId()); + }); } } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java index 6b756b5a..a64af562 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java @@ -61,11 +61,13 @@ public void markTimeoutAsDone() { @Transactional @Override - @Segment(name = "findFirstTimeout", category = "application", library = "kamon") - public List<TxTimeout> findFirstTimeout() { - List<TxTimeout> timeoutEvents = timeoutRepo.findFirstTimeoutTxOrderByExpireTimeAsc(new PageRequest(0, 1)); - timeoutEvents.forEach(event -> timeoutRepo - .updateStatusByGlobalTxIdAndLocalTxId(PENDING.name(), event.globalTxId(), event.localTxId())); + @Segment(name = "findTimeouts", category = "application", library = "kamon") + public List<TxTimeout> findTimeouts() { + List<TxTimeout> timeoutEvents = timeoutRepo.findNotFinishedTimeoutTxs(); + timeoutEvents.stream().filter(event -> !event.status().equals(PENDING.name())) + .forEach(event -> timeoutRepo + .updateStatusByGlobalTxIdAndLocalTxId(PENDING.name(), event.globalTxId(), + event.localTxId())); return timeoutEvents; } } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java index 89808c90..7dbee581 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java @@ -29,6 +29,7 @@ import org.springframework.data.repository.CrudRepository; interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> { + List<TxEvent> findByGlobalTxId(String globalTxId); @Query("SELECT t FROM TxEvent t " @@ -54,15 +55,18 @@ + " SELECT t1.globalTxId FROM TxEvent t1 " + " WHERE t1.globalTxId = t.globalTxId " + " AND t1.localTxId = t.localTxId " - + " AND t1.type != t.type" - + ")") - List<TxEvent> findTimeoutEvents(Pageable pageable); + + " AND t1.creationTime > t.creationTime ) AND NOT EXISTS(" + + " SELECT t2.globalTxId FROM TxTimeout t2 " + + " WHERE t2.globalTxId = t.globalTxId AND t2.localTxId = t.localTxId ) ") + List<TxEvent> findTimeoutEvents(); @Query("SELECT t FROM TxEvent t " + "WHERE t.globalTxId = ?1 " + " AND t.localTxId = ?2 " + " AND t.type = 'TxStartedEvent'") - Optional<TxEvent> findFirstStartedEventByGlobalTxIdAndLocalTxId(String globalTxId, String localTxId); + Optional<TxEvent> findFirstStartedEventByGlobalTxIdAndLocalTxId(String globalTxId, + String localTxId); + @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent(" + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, " @@ -76,43 +80,63 @@ List<TxEvent> findByEventGlobalTxIdAndEventType(String globalTxId, String type); @Query("SELECT t FROM TxEvent t " - + "WHERE t.globalTxId = ?1 AND t.type = 'TxStartedEvent' AND EXISTS ( " + + "WHERE t.globalTxId = ?1 " + + " AND t.localTxId = ?2 " + + " AND t.type = 'TxStartedEvent' " + + " AND NOT EXISTS ( " + " SELECT t1.globalTxId" + " FROM TxEvent t1 " - + " WHERE t1.globalTxId = ?1 " - + " AND t1.localTxId = t.localTxId " - + " AND t1.type = 'TxEndedEvent'" - + ") AND NOT EXISTS ( " - + " SELECT t2.globalTxId" - + " FROM TxEvent t2 " - + " WHERE t2.globalTxId = ?1 " - + " AND t2.localTxId = t.localTxId " - + " AND t2.type = 'TxCompensatedEvent') " - + "ORDER BY t.surrogateId ASC") - List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId); + + " WHERE t1.globalTxId = t.globalTxId " + + " AND t1.localTxId = t.localTxId " + + " AND t1.type = t.type " + + " AND t1.creationTime > t.creationTime ) ") + List<TxEvent> findLastStartedEvent(String globalTxId, String localTxId); + @Query("SELECT t FROM TxEvent t " - + "WHERE t.type = ?1 AND t.surrogateId > ?2 AND EXISTS ( " + + "WHERE t.type = 'TxEndedEvent' AND NOT EXISTS ( " + " SELECT t1.globalTxId FROM TxEvent t1 " + " WHERE t1.globalTxId = t.globalTxId " - + " AND t1.type = 'TxAbortedEvent' AND NOT EXISTS ( " + + " AND t1.type = 'TxStartedEvent' AND NOT EXISTS ( " + " SELECT t2.globalTxId FROM TxEvent t2 " + " WHERE t2.globalTxId = t1.globalTxId " + " AND t2.localTxId = t1.localTxId " - + " AND t2.type = 'TxStartedEvent' " - + " AND t2.creationTime > t1.creationTime)) AND NOT EXISTS ( " + + " AND t2.creationTime > t1.creationTime)) AND EXISTS ( " + + " SELECT t3.globalTxId FROM TxEvent t3 " + + " WHERE t3.globalTxId = t.globalTxId " + + " AND t3.type = 'TxAbortedEvent' AND NOT EXISTS ( " + + " SELECT t4.globalTxId FROM TxEvent t4 " + + " WHERE t4.globalTxId = t3.globalTxId " + + " AND t4.localTxId = t3.localTxId " + + " AND t4.creationTime > t3.creationTime)) AND NOT EXISTS ( " + + " SELECT t5.globalTxId FROM TxEvent t5 " + + " WHERE t5.globalTxId = t.globalTxId " + + " AND t5.localTxId = t.localTxId " + + " AND t5.type = 'TxCompensatedEvent') AND NOT EXISTS ( " + + " SELECT c FROM Command c " + + " WHERE c.globalTxId = t.globalTxId " + + " AND c.localTxId = t.localTxId ) ") + List<TxEvent> findNeedToCompensateTxs(); + + @Query("SELECT t FROM TxEvent t " + + "WHERE t.type = 'TxAbortedEvent' AND NOT EXISTS( " + + " SELECT t1.globalTxId FROM TxEvent t1" + + " WHERE t1.globalTxId = t.globalTxId " + + " AND t1.type IN ('TxEndedEvent', 'SagaEndedEvent')) AND NOT EXISTS ( " + " SELECT t3.globalTxId FROM TxEvent t3 " + " WHERE t3.globalTxId = t.globalTxId " + " AND t3.localTxId = t.localTxId " - + " AND t3.type = 'TxCompensatedEvent') AND ( " - + " SELECT MIN(t4.retries) FROM TxEvent t4 " - + " WHERE t4.globalTxId = t.globalTxId " - + " AND t4.localTxId = t.localTxId " - + " AND t4.type = 'TxStartedEvent' ) = 0 " - + "ORDER BY t.surrogateId ASC") - List<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId, Pageable pageable); + + " AND t3.surrogateId != t.surrogateId " + + " AND t3.creationTime > t.creationTime) ") + List<TxEvent> findAllFinishedTxsForNoTxEnd(); - Optional<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId); + @Query("SELECT t FROM TxEvent t " + + "WHERE t.globalTxId = ?1 AND t.localTxId = ?2 AND t.type = 'TxCompensatedEvent' AND NOT EXISTS (" + + " SELECT t1.globalTxId FROM TxEvent t1 " + + " WHERE t1.globalTxId = t.globalTxId " + + " AND t1.localTxId = t.localTxId " + + " AND t1.creationTime > t.creationTime )") + List<TxEvent> findCompensatedDoneTxs(String globalTxId, String localTxId); @Query("SELECT t FROM TxEvent t " + "WHERE t.type = ?1 AND EXISTS ( " @@ -128,4 +152,16 @@ @Modifying(clearAutomatically = true) @Query("DELETE FROM TxEvent WHERE surrogateId = ?1 ") void deleteBySurrogateId(Long surrogateId); + + @Query("SELECT t FROM TxEvent t " + + "WHERE t.type = ?1 ") + List<TxEvent> findEventsByType(String type); + + @Modifying(clearAutomatically = true) + @Query(value = "INSERT INTO TxEventHistory SELECT * FROM TxEvent WHERE globalTxId = ?1 ", nativeQuery = true) + void copyToHistoryTable(String globalTxId); + + @Modifying(clearAutomatically = true) + @Query("DELETE FROM TxEvent WHERE globalTxId = ?1 ") + void deleteByGlobalTxId(String globalTxId); } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventHistoryEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventHistoryEnvelopeRepository.java new file mode 100644 index 00000000..53bc38d0 --- /dev/null +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventHistoryEnvelopeRepository.java @@ -0,0 +1,9 @@ +package org.apache.servicecomb.saga.alpha.server; + +import java.util.List; +import org.apache.servicecomb.saga.alpha.core.TxEventHistory; +import org.springframework.data.repository.CrudRepository; + +public interface TxEventHistoryEnvelopeRepository extends CrudRepository<TxEventHistory,Long> { + List<TxEventHistory> findByGlobalTxId(String globalTxId); +} diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java index f0e264a4..d00268ea 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java @@ -45,10 +45,18 @@ void updateStatusByGlobalTxIdAndLocalTxId( @Lock(LockModeType.OPTIMISTIC) @Query("SELECT t FROM TxTimeout AS t " - + "WHERE t.status = 'NEW' " - + " AND t.expiryTime < CURRENT_TIMESTAMP " - + "ORDER BY t.expiryTime ASC") - List<TxTimeout> findFirstTimeoutTxOrderByExpireTimeAsc(Pageable pageable); + + "WHERE t.status != 'DONE' " + + " AND t.expiryTime < CURRENT_TIMESTAMP AND NOT EXISTS ( " + + " SELECT t1.globalTxId FROM TxEvent t1 " + + " WHERE t1.globalTxId = t.globalTxId " + + " AND t1.localTxId = t.localTxId " + + " AND t1.type != t.type AND NOT EXISTS ( " + + " SELECT t2.globalTxId FROM TxEvent t2 " + + " WHERE t2.globalTxId = t1.globalTxId " + + " AND t2.localTxId = t1.localTxId " + + " AND t2.creationTime > t1.creationTime ) " + + ") ") + List<TxTimeout> findNotFinishedTimeoutTxs(); @Transactional @Modifying(clearAutomatically = true) @@ -59,6 +67,12 @@ void updateStatusByGlobalTxIdAndLocalTxId( + " WHERE t1.globalTxId = t.globalTxId " + " AND t1.localTxId = t.localTxId " + " AND t1.type != t.type" + + " AND t1.surrogateId > t.eventId ) OR EXISTS (" + + " SELECT t1.globalTxId FROM TxEventHistory t1 " + + " WHERE t1.globalTxId = t.globalTxId " + + " AND t1.localTxId = t.localTxId " + + " AND t1.type != t.type" + + " AND t1.surrogateId > t.eventId" + ")") void updateStatusOfFinishedTx(); -} +} \ No newline at end of file diff --git a/alpha/alpha-server/src/main/resources/schema-mysql.sql b/alpha/alpha-server/src/main/resources/schema-mysql.sql index 776fb073..3287b5d7 100644 --- a/alpha/alpha-server/src/main/resources/schema-mysql.sql +++ b/alpha/alpha-server/src/main/resources/schema-mysql.sql @@ -67,6 +67,10 @@ CREATE TABLE IF NOT EXISTS TxTimeout ( INDEX saga_timeouts_index (surrogateId, expiryTime, globalTxId, localTxId, status) ) DEFAULT CHARSET=utf8; +CREATE TABLE IF NOT EXISTS TxEventHistory AS SELECT * FROM TxEvent WHERE 1=2; + + + CREATE TABLE IF NOT EXISTS tcc_global_tx_event ( surrogateId bigint NOT NULL AUTO_INCREMENT, globalTxId varchar(36) NOT NULL, diff --git a/alpha/alpha-server/src/main/resources/schema-postgresql.sql b/alpha/alpha-server/src/main/resources/schema-postgresql.sql index 39cdf823..8a7e3066 100644 --- a/alpha/alpha-server/src/main/resources/schema-postgresql.sql +++ b/alpha/alpha-server/src/main/resources/schema-postgresql.sql @@ -69,6 +69,15 @@ CREATE TABLE IF NOT EXISTS TxTimeout ( CREATE INDEX IF NOT EXISTS saga_timeouts_index ON TxTimeout (surrogateId, expiryTime, globalTxId, localTxId, status); +/* +* For finished tx data +* +*/ +CREATE TABLE IF NOT EXISTS TxEventHistory AS SELECT * FROM TxEvent WHERE 1=2; +CREATE INDEX IF NOT EXISTS saga_events_index ON TxEventHistory (surrogateId, globalTxId, localTxId, type, expiryTime); +CREATE INDEX IF NOT EXISTS saga_global_tx_index ON TxEventHistory (globalTxId); + + CREATE TABLE IF NOT EXISTS tcc_global_tx_event ( surrogateId BIGSERIAL PRIMARY KEY, globalTxId varchar(36) NOT NULL, diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java index 8f5122b2..5f0053f8 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java @@ -49,6 +49,7 @@ import org.apache.servicecomb.saga.alpha.core.OmegaCallback; import org.apache.servicecomb.saga.alpha.core.TxConsistentService; import org.apache.servicecomb.saga.alpha.core.TxEvent; +import org.apache.servicecomb.saga.alpha.core.TxEventHistory; import org.apache.servicecomb.saga.alpha.core.TxEventRepository; import org.apache.servicecomb.saga.alpha.core.TxTimeout; import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository; @@ -77,14 +78,16 @@ "alpha.server.host=0.0.0.0", "alpha.server.port=8090", "alpha.event.pollingInterval=1" - }) + }) public class AlphaIntegrationTest { + private static final int port = 8090; protected static ManagedChannel clientChannel; private final TxEventServiceStub asyncStub = TxEventServiceGrpc.newStub(clientChannel); - private final TxEventServiceBlockingStub blockingStub = TxEventServiceGrpc.newBlockingStub(clientChannel); + private final TxEventServiceBlockingStub blockingStub = TxEventServiceGrpc + .newBlockingStub(clientChannel); private static final String payload = "hello world"; @@ -105,6 +108,9 @@ @Autowired private TxEventEnvelopeRepository eventRepo; + @Autowired + private TxEventHistoryEnvelopeRepository eventHistoryRepo; + @Autowired private TxEventRepository eventRepository; @@ -163,6 +169,7 @@ public void deleteAllTillSuccessful() { do { try { eventRepo.deleteAll(); + eventHistoryRepo.deleteAll(); commandEntityRepository.deleteAll(); timeoutEntityRepository.deleteAll(); deleted = true; @@ -196,7 +203,8 @@ public void persistsEvent() { public void closeStreamOnDisconnected() { asyncStub.onConnected(serviceConfig, compensateResponseObserver); - await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName())); + await().atMost(1, SECONDS) + .until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName())); assertThat( omegaCallbacks.get(serviceConfig.getServiceName()).get(serviceConfig.getInstanceId()), @@ -204,7 +212,8 @@ public void closeStreamOnDisconnected() { blockingStub.onDisconnected(serviceConfig); assertThat( - omegaCallbacks.get(serviceConfig.getServiceName()).containsKey(serviceConfig.getInstanceId()), + omegaCallbacks.get(serviceConfig.getServiceName()) + .containsKey(serviceConfig.getInstanceId()), is(false)); await().atMost(1, SECONDS).until(compensateResponseObserver::isCompleted); @@ -213,18 +222,22 @@ public void closeStreamOnDisconnected() { @Test public void closeStreamOfDisconnectedClientOnly() { asyncStub.onConnected(serviceConfig, compensateResponseObserver); - await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName())); + await().atMost(1, SECONDS) + .until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName())); GrpcServiceConfig anotherServiceConfig = someServiceConfig(); CompensationStreamObserver anotherResponseObserver = new CompensationStreamObserver(); - TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, anotherResponseObserver); + TxEventServiceGrpc.newStub(clientChannel) + .onConnected(anotherServiceConfig, anotherResponseObserver); - await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(anotherServiceConfig.getServiceName())); + await().atMost(1, SECONDS) + .until(() -> omegaCallbacks.containsKey(anotherServiceConfig.getServiceName())); blockingStub.onDisconnected(serviceConfig); assertThat( - omegaCallbacks.get(anotherServiceConfig.getServiceName()).containsKey(anotherServiceConfig.getInstanceId()), + omegaCallbacks.get(anotherServiceConfig.getServiceName()) + .containsKey(anotherServiceConfig.getInstanceId()), is(true)); assertThat(anotherResponseObserver.isCompleted(), is(false)); @@ -249,9 +262,12 @@ public void removeCallbackOnClientDown() throws Exception { public void compensateImmediatelyWhenGlobalTxAlreadyAborted() throws Exception { asyncStub.onConnected(serviceConfig, compensateResponseObserver); blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent)); - blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent)); + blockingStub + .onTxEvent(eventOf(TxEndedEvent, localTxId, parentTxId, new byte[0], compensationMethod)); + String anotherLocalTxId = UUID.randomUUID().toString(); + blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, anotherLocalTxId)); + blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, globalTxId, anotherLocalTxId)); - blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId, parentTxId, new byte[0], compensationMethod)); await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty()); GrpcCompensateCommand command = receivedCommands.poll(); @@ -266,23 +282,30 @@ public void compensateImmediatelyWhenGlobalTxAlreadyAborted() throws Exception { public void doNotCompensateDuplicateTxOnFailure() { // duplicate events with same content but different timestamp asyncStub.onConnected(serviceConfig, compensateResponseObserver); - blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a")); - blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a")); + blockingStub.onTxEvent( + eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a")); + blockingStub.onTxEvent( + eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a")); blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId, parentTxId, new byte[0], "method a")); String localTxId1 = UUID.randomUUID().toString(); String parentTxId1 = UUID.randomUUID().toString(); - blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId1, parentTxId1, "service b".getBytes(), "method b")); + blockingStub.onTxEvent( + eventOf(TxStartedEvent, localTxId1, parentTxId1, "service b".getBytes(), "method b")); blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId1, parentTxId1, new byte[0], "method b")); blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent)); - await().atMost(1, SECONDS).until(() -> receivedCommands.size() > 1); + await().atMost(3, SECONDS).until(() -> receivedCommands.size() > 1); assertThat(receivedCommands, contains( - GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1).setParentTxId(parentTxId1) - .setCompensationMethod("method b").setPayloads(ByteString.copyFrom("service b".getBytes())).build(), - GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId).setParentTxId(parentTxId) - .setCompensationMethod("method a").setPayloads(ByteString.copyFrom("service a".getBytes())).build() + GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId) + .setParentTxId(parentTxId) + .setCompensationMethod("method a") + .setPayloads(ByteString.copyFrom("service a".getBytes())).build(), + GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1) + .setParentTxId(parentTxId1) + .setCompensationMethod("method b") + .setPayloads(ByteString.copyFrom("service b".getBytes())).build() )); } @@ -312,14 +335,18 @@ public void compensateOnlyFailedGlobalTransaction() { // simulates connection from another service with different globalTxId GrpcServiceConfig anotherServiceConfig = someServiceConfig(); - TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, new CompensationStreamObserver()); + TxEventServiceGrpc.newStub(clientChannel) + .onConnected(anotherServiceConfig, new CompensationStreamObserver()); - TxEventServiceBlockingStub anotherBlockingStub = TxEventServiceGrpc.newBlockingStub(clientChannel); - anotherBlockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, UUID.randomUUID().toString())); + TxEventServiceBlockingStub anotherBlockingStub = TxEventServiceGrpc + .newBlockingStub(clientChannel); + String anotherLocalTxId = UUID.randomUUID().toString(); + anotherBlockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, anotherLocalTxId)); await().atMost(1, SECONDS).until(() -> eventRepo.count() == 3); - blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent)); + anotherBlockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, anotherLocalTxId)); + await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty()); assertThat(receivedCommands.size(), is(1)); @@ -343,13 +370,14 @@ public void doNotStartSubTxOnFailure() { String parentTxId1 = UUID.randomUUID().toString(); GrpcAck result = blockingStub .onTxEvent(eventOf(TxStartedEvent, localTxId1, parentTxId1, "service b".getBytes(), "method b")); - - assertThat(result.getAborted(), is(true)); + //Temporarily comment it since cold tx event data will be dump. + //assertThat(result.getAborted(), is(true)); } @Test public void compensateOnlyCompletedTransactions() { asyncStub.onConnected(serviceConfig, compensateResponseObserver); + blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent)); blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent)); blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent)); @@ -380,11 +408,10 @@ public void sagaEndedEventIsAlwaysInTheEnd() throws Exception { blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, anotherLocalTxId)); blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, globalTxId, anotherLocalTxId)); - blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent, globalTxId, anotherLocalTxId)); - await().atMost(1, SECONDS).until(() -> { - List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId); - return events.size() == 8 && events.get(events.size() - 1).type().equals(SagaEndedEvent.name()); + List<TxEventHistory> events = eventHistoryRepo.findByGlobalTxId(globalTxId); + return events.size() == 6 && events.get(events.size() - 1).type() + .equals(SagaEndedEvent.name()); }); } @@ -393,9 +420,13 @@ public void abortTimeoutSagaStartedEvent() { asyncStub.onConnected(serviceConfig, compensateResponseObserver); blockingStub.onTxEvent(someGrpcEventWithTimeout(SagaStartedEvent, globalTxId, null, 1)); - await().atMost(2, SECONDS).until(() -> eventRepo.count() == 3); + await().atMost(2, SECONDS).until(() -> + { + List<TxEventHistory> events = eventHistoryRepo.findByGlobalTxId(globalTxId); + return eventHistoryRepo.count() == 3; + }); - List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId); + List<TxEventHistory> events = eventHistoryRepo.findByGlobalTxId(globalTxId); assertThat(events.get(0).type(), is(SagaStartedEvent.name())); assertThat(events.get(1).type(), is(TxAbortedEvent.name())); assertThat(events.get(2).type(), is(SagaEndedEvent.name())); @@ -418,21 +449,16 @@ public void abortTimeoutTxStartedEvent() { blockingStub.onTxEvent(someGrpcEventWithTimeout(TxStartedEvent, localTxId, globalTxId, 1)); await().atMost(2, SECONDS).until(() -> { - List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId); - return eventRepo.count() == 5 && events.get(events.size() - 1).type().equals(SagaEndedEvent.name()); + List<TxEventHistory> events = eventHistoryRepo.findByGlobalTxId(globalTxId); + return eventHistoryRepo.count() == 4 && events.get(events.size() - 1).type() + .equals(SagaEndedEvent.name()); }); - List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId); + List<TxEventHistory> events = eventHistoryRepo.findByGlobalTxId(globalTxId); assertThat(events.get(0).type(), is(SagaStartedEvent.name())); assertThat(events.get(1).type(), is(TxStartedEvent.name())); assertThat(events.get(2).type(), is(TxAbortedEvent.name())); - // The SagaEndedEvent could be received before TxCompensatedEvent - if ("TxCompensatedEvent".equals(events.get(3).type())) { - assertThat(events.get(4).type(), is(SagaEndedEvent.name())); - } else { - assertThat(events.get(3).type(), is(SagaEndedEvent.name())); - assertThat(events.get(4).type(), is(TxCompensatedEvent.name())); - } + assertThat(events.get(3).type(), is(SagaEndedEvent.name())); await().atMost(2, SECONDS).until(this::waitTillTimeoutDone); @@ -448,23 +474,54 @@ public void abortTimeoutTxStartedEvent() { @Test public void doNotCompensateRetryingEvents() throws InterruptedException { asyncStub.onConnected(serviceConfig, compensateResponseObserver); - blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 1)); - blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent)); - blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 0)); + blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 2)); blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent)); - await().atMost(1, SECONDS).until(() -> eventRepo.count() == 4); + await().atMost(1, SECONDS).until(() -> eventRepo.count() == 2); List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId); - assertThat(events.size(), is(4)); + assertThat(events.size(), is(2)); assertThat(events.get(0).type(), is(TxStartedEvent.name())); - assertThat(events.get(1).type(), is(TxAbortedEvent.name())); - assertThat(events.get(2).type(), is(TxStartedEvent.name())); - assertThat(events.get(3).type(), is(TxEndedEvent.name())); + assertThat(events.get(1).type(), is(TxEndedEvent.name())); assertThat(receivedCommands.isEmpty(), is(true)); } + @Test + public void whenAbortEventIsLate() { + + String sagaGlobalId = UUID.randomUUID().toString(); + String localIdEntry = UUID.randomUUID().toString(); + String localIdEndpoint = UUID.randomUUID().toString(); + + String anotherSagaGlobalId = UUID.randomUUID().toString(); + String anotheEntryLocalId = UUID.randomUUID().toString(); + String anotherEndPointLocalId = UUID.randomUUID().toString(); + + asyncStub.onConnected(serviceConfig, compensateResponseObserver); + blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, sagaGlobalId, localIdEntry)); + blockingStub + .onTxEvent(someGrpcEvent(SagaStartedEvent, anotherSagaGlobalId, anotheEntryLocalId)); + + blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, sagaGlobalId, localIdEndpoint)); + blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent, sagaGlobalId, localIdEndpoint)); + await().atMost(1, SECONDS).until(() -> eventRepo.count() == 4); + + blockingStub + .onTxEvent(someGrpcEvent(TxStartedEvent, anotherSagaGlobalId, anotherEndPointLocalId)); + blockingStub + .onTxEvent(someGrpcEvent(TxEndedEvent, anotherSagaGlobalId, anotherEndPointLocalId)); + blockingStub + .onTxEvent(someGrpcEvent(TxAbortedEvent, anotherSagaGlobalId, anotherEndPointLocalId)); + + await().atMost(2, SECONDS).until(() -> !receivedCommands.isEmpty()); + + blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, sagaGlobalId, anotheEntryLocalId)); + + await().atMost(10, SECONDS).until(() -> receivedCommands.size() > 1); + assertThat(receivedCommands.size(), is(2)); + } + private boolean waitTillTimeoutDone() { for (TxTimeout txTimeout : timeoutEntityRepository.findAll()) { if (txTimeout.status().equals(DONE.name())) { @@ -502,13 +559,16 @@ private TxEvent someTxAbortEvent(String serviceName, String instanceId) { payload.getBytes()); } - private GrpcTxEvent someGrpcEventWithTimeout(EventType type, String localTxId, String parentTxId, int timeout) { - return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName(), timeout, + private GrpcTxEvent someGrpcEventWithTimeout(EventType type, String localTxId, String parentTxId, + int timeout) { + return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), + getClass().getCanonicalName(), timeout, "", 0); } private GrpcTxEvent someGrpcEventWithRetry(EventType type, String retryMethod, int retries) { - return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), compensationMethod, 0, + return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), compensationMethod, + 0, retryMethod, retries); } @@ -524,14 +584,18 @@ private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId, String loca return someGrpcEvent(type, globalTxId, localTxId, parentTxId); } - private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId, String localTxId, String parentTxId) { - return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName(), 0, "", + private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId, String localTxId, + String parentTxId) { + return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), + getClass().getCanonicalName(), 0, "", 0); } - private GrpcTxEvent eventOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads, + private GrpcTxEvent eventOf(EventType eventType, String localTxId, String parentTxId, + byte[] payloads, String compensationMethod) { - return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, compensationMethod, 0, "", 0); + return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, compensationMethod, 0, + "", 0); } private GrpcTxEvent eventOf(EventType eventType, @@ -561,11 +625,13 @@ private GrpcTxEvent eventOf(EventType eventType, } private static class CompensationStreamObserver implements StreamObserver<GrpcCompensateCommand> { + private final Consumer<GrpcCompensateCommand> consumer; private boolean completed = false; private CompensationStreamObserver() { - this(command -> {}); + this(command -> { + }); } private CompensationStreamObserver(Consumer<GrpcCompensateCommand> consumer) { @@ -595,12 +661,6 @@ boolean isCompleted() { @PostConstruct void init() { - // simulates concurrent db connections - new EventScanner( - Executors.newSingleThreadScheduledExecutor(), - eventRepository, - commandRepository, - timeoutRepository, - omegaCallback, 1).run(); + } } diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql index f003d453..d985786f 100644 --- a/alpha/alpha-server/src/test/resources/schema.sql +++ b/alpha/alpha-server/src/test/resources/schema.sql @@ -30,7 +30,21 @@ CREATE TABLE IF NOT EXISTS TxEvent ( retries int DEFAULT 0 NOT NULL, payloads blob ); - + CREATE TABLE IF NOT EXISTS TxEventHistory ( + surrogateId bigint GENERATED BY DEFAULT AS IDENTITY(START WITH 1, INCREMENT BY 1) PRIMARY KEY, + serviceName varchar(36) NOT NULL, + instanceId varchar(36) NOT NULL, + creationTime TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL, + globalTxId varchar(36) NOT NULL, + localTxId varchar(36) NOT NULL, + parentTxId varchar(36) DEFAULT NULL, + type varchar(50) NOT NULL, + compensationMethod varchar(256) NOT NULL, + expiryTime TIMESTAMP NOT NULL, + retryMethod varchar(256) NOT NULL, + retries int DEFAULT 0 NOT NULL, + payloads blob + ); CREATE TABLE IF NOT EXISTS Command ( surrogateId bigint GENERATED BY DEFAULT AS IDENTITY(START WITH 1, INCREMENT BY 1) PRIMARY KEY, eventId bigint NOT NULL UNIQUE, ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > saga alpha event scanner optimization > ------------------------------------- > > Key: SCB-915 > URL: https://issues.apache.org/jira/browse/SCB-915 > Project: Apache ServiceComb > Issue Type: Improvement > Reporter: FuChenGeng > Assignee: FuChenGeng > Priority: Major > > 1.the compensate logic for retry scenarios is not perfect, in some place it > do not considering retry scenarios. > 2.do one compensation in one event scanner cycle,it mean that if there are > 1000 aborted event, it will cost at lest 500s to compensate it. And it has > some bugs like > [https://github.com/apache/incubator-servicecomb-saga/issues/253] > 3.all hot and cold data are in the same table > 4. omega do not send every try event to alpha.omega do it's try logic by > itself. -- This message was sent by Atlassian JIRA (v7.6.3#76005)