This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 10c54afaafc5e2048f9a6cc7c34340cf0f4c77f8 Author: Lei Zhang <[email protected]> AuthorDate: Tue Sep 10 17:26:57 2019 +0800 SCB-1368 Add shard region selection Actor --- .../servicecomb/pack/alpha/fsm/SagaActor.java | 28 ++++++---- .../pack/alpha/fsm/SagaShardRegionActor.java | 65 ++++++++++++++++++++++ 2 files changed, 81 insertions(+), 12 deletions(-) diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java index fc06255..ba4b0ff 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java @@ -59,13 +59,13 @@ public class SagaActor extends return Props.create(SagaActor.class, persistenceId); } - private final String persistenceId; + private String persistenceId; private long sagaBeginTime; private long sagaEndTime; - public SagaActor(String persistenceId) { - this.persistenceId = persistenceId; + public SagaActor() { + this.persistenceId = getSelf().path().name(); startWith(SagaActorState.IDLE, SagaData.builder().build()); @@ -380,6 +380,12 @@ public class SagaActor extends @Override public SagaData applyEvent(DomainEvent event, SagaData data) { + if (this.recoveryRunning()) { + LOG.info("SagaActor recovery {}",event.getEvent()); + } + if (LOG.isDebugEnabled()) { + LOG.debug("SagaActor apply event {}", event.getEvent()); + } // log event to SagaData if (event.getEvent() != null && !(event .getEvent() instanceof ComponsitedCheckEvent)) { @@ -404,6 +410,7 @@ public class SagaActor extends .compensationMethod(domainEvent.getCompensationMethod()) .payloads(domainEvent.getPayloads()) .state(domainEvent.getState()) + .beginTime(domainEvent.getEvent().getCreateTime()) .build(); data.getTxEntityMap().put(txEntity.getLocalTxId(), txEntity); } else { @@ -412,7 +419,7 @@ public class SagaActor extends } else if (event instanceof UpdateTxEventDomain) { UpdateTxEventDomain domainEvent = (UpdateTxEventDomain) event; TxEntity txEntity = data.getTxEntityMap().get(domainEvent.getLocalTxId()); - txEntity.setEndTime(new Date()); + txEntity.setEndTime(domainEvent.getEvent().getCreateTime()); if (domainEvent.getState() == TxState.COMMITTED) { txEntity.setState(domainEvent.getState()); } else if (domainEvent.getState() == TxState.FAILED) { @@ -441,27 +448,24 @@ public class SagaActor extends } }); } else if (domainEvent.getState() == SagaActorState.SUSPENDED) { - data.setEndTime(new Date()); + data.setEndTime(event.getEvent().getCreateTime()); data.setTerminated(true); data.setSuspendedType(domainEvent.getSuspendedType()); } else if (domainEvent.getState() == SagaActorState.COMPENSATED) { - data.setEndTime(new Date()); + data.setEndTime(event.getEvent().getCreateTime()); data.setTerminated(true); } else if (domainEvent.getState() == SagaActorState.COMMITTED) { - data.setEndTime(new Date()); + data.setEndTime(event.getEvent().getCreateTime()); data.setTerminated(true); } } - if (LOG.isDebugEnabled()) { - LOG.debug("applyEvent: {} {}", stateName(), stateData().getGlobalTxId()); - } return data; } @Override public void onRecoveryCompleted() { - if (LOG.isDebugEnabled()) { - LOG.debug("onRecoveryCompleted: {} {}", stateName(), stateData().getGlobalTxId()); + if(stateName() != SagaActorState.IDLE){ + LOG.info("SagaActor {} recovery completed, state={}", stateData().getGlobalTxId(), stateName()); } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java new file mode 100644 index 0000000..d43ba85 --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java @@ -0,0 +1,65 @@ +package org.apache.servicecomb.pack.alpha.fsm; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.cluster.sharding.ClusterSharding; +import akka.cluster.sharding.ClusterShardingSettings; +import akka.cluster.sharding.ShardRegion; +import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent; + +public class SagaShardRegionActor extends AbstractActor { + + private final ActorRef workerRegion; + + static ShardRegion.MessageExtractor messageExtractor = new ShardRegion.MessageExtractor() { + @Override + public String entityId(Object message) { + if (message instanceof BaseEvent) { + return ((BaseEvent) message).getGlobalTxId(); + } else { + return null; + } + } + + @Override + public Object entityMessage(Object message) { + return message; + } + + @Override + public String shardId(Object message) { + int numberOfShards = 100; + if (message instanceof BaseEvent) { + String actorId = ((BaseEvent) message).getGlobalTxId(); + return String.valueOf(actorId.hashCode() % numberOfShards); + } else if (message instanceof ShardRegion.StartEntity) { + String actorId = ((ShardRegion.StartEntity) message).entityId(); + return String.valueOf(actorId.hashCode() % numberOfShards); + } else { + return null; + } + } + }; + + public SagaShardRegionActor() { + ActorSystem system = getContext().getSystem(); + ClusterShardingSettings settings = ClusterShardingSettings.create(system); + workerRegion = ClusterSharding.get(system) + .start( + "saga-actor", + Props.create(SagaActor.class), + settings, + messageExtractor); + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .matchAny(msg -> { + workerRegion.tell(msg, getSelf()); + }) + .build(); + } +}
