This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 6f87dc9b12a198e3661457d7af608fc5d3ee806a Author: Lei Zhang <[email protected]> AuthorDate: Mon Jul 15 14:06:38 2019 +0800 SCB-1372 Add state machine metrics --- .../pack/alpha/fsm/FsmAutoConfiguration.java | 28 ++-- .../servicecomb/pack/alpha/fsm/SagaActor.java | 8 + ...Channel.java => AbstractActorEventChannel.java} | 36 +++-- .../fsm/channel/ActiveMQActorEventChannel.java | 10 +- .../alpha/fsm/channel/KafkaActorEventChannel.java | 10 +- .../alpha/fsm/channel/MemoryActorEventChannel.java | 17 ++- .../alpha/fsm/channel/RedisActorEventChannel.java | 10 +- .../pack/alpha/fsm/metrics/MetricsBean.java | 165 +++++++++++++++++++++ .../MetricsService.java} | 21 +-- .../pack/alpha/fsm/sink/SagaActorEventSender.java | 14 ++ .../spring/integration/akka/SagaDataExtension.java | 54 +++++-- .../servicecomb/pack/alpha/fsm/SagaActorTest.java | 31 ++-- .../pack/alpha/fsm/SagaIntegrationTest.java | 8 + 13 files changed, 327 insertions(+), 85 deletions(-) diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java index fcf5cec..ef8248d 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java @@ -17,6 +17,7 @@ package org.apache.servicecomb.pack.alpha.fsm; +import static org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER; import static org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SpringAkkaExtension.SPRING_EXTENSION_PROVIDER; import akka.actor.ActorSystem; @@ -24,6 +25,7 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import java.util.Map; import org.apache.servicecomb.pack.alpha.fsm.channel.ActiveMQActorEventChannel; +import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink; import org.apache.servicecomb.pack.alpha.fsm.channel.ActorEventChannel; import org.apache.servicecomb.pack.alpha.fsm.channel.KafkaActorEventChannel; @@ -51,6 +53,7 @@ public class FsmAutoConfiguration { public ActorSystem actorSystem(ConfigurableApplicationContext applicationContext, ConfigurableEnvironment environment) { ActorSystem system = ActorSystem.create("alpha-akka", akkaConfiguration(applicationContext,environment)); SPRING_EXTENSION_PROVIDER.get(system).initialize(applicationContext); + SAGA_DATA_EXTENSION_PROVIDER.get(system).initialize(applicationContext); return system; } @@ -66,33 +69,38 @@ public class FsmAutoConfiguration { } @Bean - public ActorEventSink actorEventSink(){ - return new SagaActorEventSender(); + public MetricsService metricsService(){ + return new MetricsService(); + } + + @Bean + public ActorEventSink actorEventSink(MetricsService metricsService){ + return new SagaActorEventSender(metricsService); } @Bean @ConditionalOnMissingBean(ActorEventChannel.class) @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "memory", matchIfMissing = true) - public ActorEventChannel memoryEventChannel(ActorEventSink actorEventSink){ - return new MemoryActorEventChannel(actorEventSink, memoryEventChannelMemorySize); + public ActorEventChannel memoryEventChannel(ActorEventSink actorEventSink, MetricsService metricsService){ + return new MemoryActorEventChannel(actorEventSink, memoryEventChannelMemorySize,metricsService); } @Bean @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "activemq") - public ActorEventChannel activeMqEventChannel(ActorEventSink actorEventSink){ - return new ActiveMQActorEventChannel(actorEventSink); + public ActorEventChannel activeMqEventChannel(ActorEventSink actorEventSink, MetricsService metricsService){ + return new ActiveMQActorEventChannel(actorEventSink, metricsService); } @Bean @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "kafka") - public ActorEventChannel kafkaEventChannel(ActorEventSink actorEventSink){ - return new KafkaActorEventChannel(actorEventSink); + public ActorEventChannel kafkaEventChannel(ActorEventSink actorEventSink, MetricsService metricsService){ + return new KafkaActorEventChannel(actorEventSink, metricsService); } @Bean @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "redis") - public ActorEventChannel redisEventChannel(ActorEventSink actorEventSink){ - return new RedisActorEventChannel(actorEventSink); + public ActorEventChannel redisEventChannel(ActorEventSink actorEventSink, MetricsService metricsService){ + return new RedisActorEventChannel(actorEventSink, metricsService); } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java index cb97cb1..f173a6f 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java @@ -59,6 +59,9 @@ public class SagaActor extends private final String persistenceId; + private long sagaBeginTime; + private long sagaEndTime; + public SagaActor(String persistenceId) { this.persistenceId = persistenceId; @@ -67,6 +70,8 @@ public class SagaActor extends when(SagaActorState.IDLE, matchEvent(SagaStartedEvent.class, (event, data) -> { + sagaBeginTime = System.currentTimeMillis(); + SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaBeginCounter(); SagaStartedDomain domainEvent = new SagaStartedDomain(event); if (event.getTimeout() > 0) { return goTo(SagaActorState.READY) @@ -358,6 +363,9 @@ public class SagaActor extends data.setEndTime(new Date()); SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem()) .stopSagaData(data.getGlobalTxId(), data); + sagaEndTime = System.currentTimeMillis(); + SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaEndCounter(); + SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaAvgTime(sagaEndTime - sagaBeginTime); } ) ); diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractActorEventChannel.java similarity index 55% copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractActorEventChannel.java index 3fcceb4..278d7d6 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractActorEventChannel.java @@ -17,27 +17,35 @@ package org.apache.servicecomb.pack.alpha.fsm.channel; -import java.lang.invoke.MethodHandles; import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; +import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -/** - * Pub/Sub - * */ +public abstract class AbstractActorEventChannel implements ActorEventChannel { -public class RedisActorEventChannel implements ActorEventChannel { - private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private final ActorEventSink actorEventSink; + protected final MetricsService metricsService; + protected final ActorEventSink actorEventSink; - public RedisActorEventChannel( - ActorEventSink actorEventSink) { + public abstract void sendTo(BaseEvent event); + + public AbstractActorEventChannel( + ActorEventSink actorEventSink, + MetricsService metricsService) { this.actorEventSink = actorEventSink; + this.metricsService = metricsService; } - @Override - public void send(BaseEvent event){ - throw new UnsupportedOperationException("Doesn't implement yet!"); + public void send(BaseEvent event) { + long begin = System.currentTimeMillis(); + metricsService.metrics().doEventReceived(); + try { + this.sendTo(event); + metricsService.metrics().doEventAccepted(); + } catch (Exception ex) { + metricsService.metrics().doEventRejected(); + } + long end = System.currentTimeMillis(); + metricsService.metrics().doEventAvgTime(end - begin); } + } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java index 9f0e024..1d3b08e 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java @@ -19,6 +19,7 @@ package org.apache.servicecomb.pack.alpha.fsm.channel; import java.lang.invoke.MethodHandles; import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; +import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,17 +28,16 @@ import org.slf4j.LoggerFactory; * Queue * */ -public class ActiveMQActorEventChannel implements ActorEventChannel { +public class ActiveMQActorEventChannel extends AbstractActorEventChannel { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private final ActorEventSink actorEventSink; public ActiveMQActorEventChannel( - ActorEventSink actorEventSink) { - this.actorEventSink = actorEventSink; + ActorEventSink actorEventSink, MetricsService metricsService) { + super(actorEventSink, metricsService); } @Override - public void send(BaseEvent event){ + public void sendTo(BaseEvent event){ throw new UnsupportedOperationException("Doesn't implement yet!"); } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java index a4d2525..bff14c3 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java @@ -19,21 +19,21 @@ package org.apache.servicecomb.pack.alpha.fsm.channel; import java.lang.invoke.MethodHandles; import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; +import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class KafkaActorEventChannel implements ActorEventChannel { +public class KafkaActorEventChannel extends AbstractActorEventChannel { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private final ActorEventSink actorEventSink; public KafkaActorEventChannel( - ActorEventSink actorEventSink) { - this.actorEventSink = actorEventSink; + ActorEventSink actorEventSink, MetricsService metricsService) { + super(actorEventSink, metricsService); } @Override - public void send(BaseEvent event){ + public void sendTo(BaseEvent event){ throw new UnsupportedOperationException("Doesn't implement yet!"); } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java index 1af2432..e4e01b7 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java @@ -21,29 +21,30 @@ import java.lang.invoke.MethodHandles; import java.util.concurrent.LinkedBlockingQueue; import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; +import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MemoryActorEventChannel implements ActorEventChannel { +public class MemoryActorEventChannel extends AbstractActorEventChannel { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private final ActorEventSink actorEventSink; private final LinkedBlockingQueue<BaseEvent> eventQueue; private int size; - public MemoryActorEventChannel(ActorEventSink actorEventSink, int size) { + public MemoryActorEventChannel(ActorEventSink actorEventSink, int size, + MetricsService metricsService) { + super(actorEventSink, metricsService); this.size = size > 0 ? size : Integer.MAX_VALUE; eventQueue = new LinkedBlockingQueue(this.size); - this.actorEventSink = actorEventSink; - new Thread(new EventConsumer(),"MemoryActorEventChannel").start(); + new Thread(new EventConsumer(), "MemoryActorEventChannel").start(); } @Override - public void send(BaseEvent event){ - try{ + public void sendTo(BaseEvent event) { + try { eventQueue.put(event); - }catch (Exception e){ + } catch (Exception e) { throw new RuntimeException(e); } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java index 3fcceb4..71319dd 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java @@ -19,6 +19,7 @@ package org.apache.servicecomb.pack.alpha.fsm.channel; import java.lang.invoke.MethodHandles; import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; +import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,17 +28,16 @@ import org.slf4j.LoggerFactory; * Pub/Sub * */ -public class RedisActorEventChannel implements ActorEventChannel { +public class RedisActorEventChannel extends AbstractActorEventChannel { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private final ActorEventSink actorEventSink; public RedisActorEventChannel( - ActorEventSink actorEventSink) { - this.actorEventSink = actorEventSink; + ActorEventSink actorEventSink, MetricsService metricsService) { + super(actorEventSink, metricsService); } @Override - public void send(BaseEvent event){ + public void sendTo(BaseEvent event){ throw new UnsupportedOperationException("Doesn't implement yet!"); } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsBean.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsBean.java new file mode 100644 index 0000000..4e73776 --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsBean.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.pack.alpha.fsm.metrics; + +import com.google.common.util.concurrent.AtomicDouble; +import java.util.concurrent.atomic.AtomicLong; + +public class MetricsBean { + + private AtomicLong eventReceived = new AtomicLong(); + private AtomicLong eventAccepted = new AtomicLong(); + private AtomicLong eventRejected = new AtomicLong(); + private AtomicDouble eventAvgTime = new AtomicDouble();//milliseconds moving average + private AtomicLong actorReceived = new AtomicLong(); + private AtomicLong actorAccepted = new AtomicLong(); + private AtomicLong actorRejected = new AtomicLong(); + private AtomicDouble actorAvgTime = new AtomicDouble();//milliseconds moving average + private AtomicLong sagaBeginCounter = new AtomicLong(); + private AtomicLong sagaEndCounter = new AtomicLong(); + private AtomicDouble sagaAvgTime = new AtomicDouble();//milliseconds moving average + private AtomicLong committed = new AtomicLong(); + private AtomicLong compensated = new AtomicLong(); + private AtomicLong suspended = new AtomicLong(); + + public void doEventReceived() { + eventReceived.incrementAndGet(); + } + + public void doEventAccepted() { + eventAccepted.incrementAndGet(); + } + + public void doEventRejected() { + eventReceived.decrementAndGet(); + eventRejected.incrementAndGet(); + } + + public void doEventAvgTime(long time) { + if (eventAvgTime.get() == 0) { + eventAvgTime.set(time); + } else { + eventAvgTime.set((eventAvgTime.get() + time) / 2); + } + } + + public void doActorReceived() { + actorReceived.incrementAndGet(); + } + + public void doActorAccepted() { + actorAccepted.incrementAndGet(); + } + + public void doActorRejected() { + actorReceived.decrementAndGet(); + actorRejected.incrementAndGet(); + } + + public void doActorAvgTime(long time) { + if (actorAvgTime.get() == 0) { + actorAvgTime.set(time); + } else { + actorAvgTime.set((actorAvgTime.get() + time) / 2); + } + } + + public void doSagaBeginCounter() { + sagaBeginCounter.incrementAndGet(); + } + + public void doSagaEndCounter() { + sagaEndCounter.incrementAndGet(); + } + + public void doSagaAvgTime(long time) { + if (sagaAvgTime.get() == 0) { + sagaAvgTime.set(time); + } else { + sagaAvgTime.set((sagaAvgTime.get() + time) / 2); + } + } + + public void doCommitted() { + committed.incrementAndGet(); + } + + public void doCompensated() { + compensated.incrementAndGet(); + } + + public void doSuspended() { + suspended.incrementAndGet(); + } + + public long getEventReceived() { + return eventReceived.get(); + } + + public long getEventAccepted() { + return eventAccepted.get(); + } + + public long getEventRejected() { + return eventRejected.get(); + } + + public double getEventAvgTime() { + return eventAvgTime.get(); + } + + public long getActorReceived() { + return actorReceived.get(); + } + + public long getActorAccepted() { + return actorAccepted.get(); + } + + public long getActorRejected() { + return actorRejected.get(); + } + + public double getActorAvgTime() { + return actorAvgTime.get(); + } + + public long getSagaBeginCounter() { + return sagaBeginCounter.get(); + } + + public long getSagaEndCounter() { + return sagaEndCounter.get(); + } + + public double getSagaAvgTime() { + return sagaAvgTime.get(); + } + + public long getCommitted() { + return committed.get(); + } + + public long getCompensated() { + return compensated.get(); + } + + public long getSuspended() { + return suspended.get(); + } +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsService.java similarity index 52% copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsService.java index a4d2525..e1b675f 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsService.java @@ -15,25 +15,14 @@ * limitations under the License. */ -package org.apache.servicecomb.pack.alpha.fsm.channel; +package org.apache.servicecomb.pack.alpha.fsm.metrics; -import java.lang.invoke.MethodHandles; -import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; -import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +public class MetricsService { -public class KafkaActorEventChannel implements ActorEventChannel { - private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private final ActorEventSink actorEventSink; + private final MetricsBean metrics = new MetricsBean(); - public KafkaActorEventChannel( - ActorEventSink actorEventSink) { - this.actorEventSink = actorEventSink; + public MetricsBean metrics() { + return metrics; } - @Override - public void send(BaseEvent event){ - throw new UnsupportedOperationException("Doesn't implement yet!"); - } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java index 879af40..aa25250 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import org.apache.servicecomb.pack.alpha.fsm.SagaActor; import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent; import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; +import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -37,12 +38,21 @@ public class SagaActorEventSender implements ActorEventSink { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final MetricsService metricsService; + @Autowired ActorSystem system; + public SagaActorEventSender( + MetricsService metricsService) { + this.metricsService = metricsService; + } + private static final Timeout lookupTimeout = new Timeout(Duration.create(1, TimeUnit.SECONDS)); public void send(BaseEvent event) { + long begin = System.currentTimeMillis(); + metricsService.metrics().doActorReceived(); try{ if (LOG.isDebugEnabled()) { LOG.debug("send {} ", event.toString()); @@ -59,7 +69,11 @@ public class SagaActorEventSender implements ActorEventSink { final ActorRef saga = Await.result(actorRefFuture, lookupTimeout.duration()); saga.tell(event, ActorRef.noSender()); } + metricsService.metrics().doActorAccepted(); + long end = System.currentTimeMillis(); + metricsService.metrics().doActorAvgTime(end - begin); }catch (Exception ex){ + metricsService.metrics().doActorRejected(); throw new RuntimeException(ex); } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java index 7266c54..7f8eaac 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java @@ -22,13 +22,16 @@ import akka.actor.ExtendedActorSystem; import akka.actor.Extension; import java.lang.invoke.MethodHandles; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.servicecomb.pack.alpha.fsm.SagaActorState; +import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; import org.apache.servicecomb.pack.alpha.fsm.model.SagaData; import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension.SagaDataExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationContext; public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static final SagaDataExtension SAGA_DATA_EXTENSION_PROVIDER = new SagaDataExtension(); //TODO We could use test profile the enable this kind feature @@ -40,22 +43,25 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> { } public static class SagaDataExt implements Extension { + //private final ConcurrentLinkedQueue<String> globalTxIds = new ConcurrentLinkedQueue<>(); private final ConcurrentHashMap<String, SagaData> sagaDataMap = new ConcurrentHashMap(); private String lastGlobalTxId; private CleanMemForTest cleanMemForTest = new CleanMemForTest(sagaDataMap); + private volatile ApplicationContext applicationContext; + private MetricsService metricsService; public SagaDataExt() { // Just to avoid the overflow of the OldGen for stress testing // Delete after SagaData persistence - if(autoCleanSagaDataMap){ + if (autoCleanSagaDataMap) { new Thread(cleanMemForTest).start(); } } public void putSagaData(String globalTxId, SagaData sagaData) { //if(!globalTxIds.contains(globalTxId)){ - lastGlobalTxId = globalTxId; + lastGlobalTxId = globalTxId; // globalTxIds.add(globalTxId); //} sagaDataMap.put(globalTxId, sagaData); @@ -65,6 +71,13 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> { // TODO save SagaDate to database and clean sagaDataMap this.putSagaData(globalTxId, sagaData); lastGlobalTxId = globalTxId; + if (sagaData.getLastState() == SagaActorState.COMMITTED) { + this.metricsService.metrics().doCommitted(); + } else if (sagaData.getLastState() == SagaActorState.COMPENSATED) { + this.metricsService.metrics().doCompensated(); + } else if (sagaData.getLastState() == SagaActorState.SUSPENDED) { + this.metricsService.metrics().doSuspended(); + } } public SagaData getSagaData(String globalTxId) { @@ -83,9 +96,32 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> { public SagaData getLastSagaData() { return getSagaData(lastGlobalTxId); } + + public void doSagaBeginCounter() { + this.metricsService.metrics().doSagaBeginCounter(); + } + + public void doSagaEndCounter() { + this.metricsService.metrics().doSagaEndCounter(); + } + + public void doSagaAvgTime(long time) { + this.metricsService.metrics().doSagaAvgTime(time); + } + + public void setMetricsService( + MetricsService metricsService) { + this.metricsService = metricsService; + } + + public void initialize(ApplicationContext applicationContext) { + this.applicationContext = applicationContext; + this.setMetricsService(this.applicationContext.getBean(MetricsService.class)); + } } static class CleanMemForTest implements Runnable { + final ConcurrentHashMap<String, SagaData> sagaDataMap; public CleanMemForTest(ConcurrentHashMap<String, SagaData> sagaDataMap) { @@ -94,16 +130,16 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> { @Override public void run() { - while (true){ - try{ + while (true) { + try { sagaDataMap.clear(); - }catch (Exception e){ - LOG.error(e.getMessage(),e); - }finally { + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } finally { try { Thread.sleep(10000); } catch (InterruptedException e) { - LOG.error(e.getMessage(),e); + LOG.error(e.getMessage(), e); } } } diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java index c68c5d6..783142c 100644 --- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java +++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java @@ -17,6 +17,7 @@ package org.apache.servicecomb.pack.alpha.fsm; +import static org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; @@ -33,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; +import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; import org.apache.servicecomb.pack.alpha.fsm.model.SagaData; import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension; import org.junit.AfterClass; @@ -44,6 +46,8 @@ public class SagaActorTest { static ActorSystem system; + static MetricsService metricsService = new MetricsService(); + private static Map<String,Object> getPersistenceMemConfig(){ Map<String, Object> map = new HashMap<>(); map.put("akka.persistence.journal.plugin", "akka.persistence.journal.inmem"); @@ -69,6 +73,7 @@ public class SagaActorTest { public static void setup() { SagaDataExtension.autoCleanSagaDataMap=false; system = ActorSystem.create("SagaActorTest", ConfigFactory.parseMap(getPersistenceMemConfig())); + SAGA_DATA_EXTENSION_PROVIDER.get(system).setMetricsService(metricsService); } @AfterClass @@ -139,7 +144,7 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); - SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getGlobalTxId(), globalTxId); assertEquals(sagaData.getTxEntityMap().size(), 3); sagaData.getTxEntityMap().forEach((k, v) -> { @@ -227,7 +232,7 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), recoveredSaga); - SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getGlobalTxId(), globalTxId); assertEquals(sagaData.getTxEntityMap().size(), 3); sagaData.getTxEntityMap().forEach((k, v) -> { @@ -279,7 +284,7 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); - SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getGlobalTxId(), globalTxId); assertEquals(sagaData.getTxEntityMap().size(), 1); assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED); @@ -341,7 +346,7 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); - SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getGlobalTxId(), globalTxId); assertEquals(sagaData.getTxEntityMap().size(), 2); assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED); @@ -414,7 +419,7 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); - SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getGlobalTxId(), globalTxId); assertEquals(sagaData.getTxEntityMap().size(), 3); assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED); @@ -487,7 +492,7 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); - SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getGlobalTxId(), globalTxId); assertEquals(sagaData.getTxEntityMap().size(), 3); assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED); @@ -549,7 +554,7 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); - SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getGlobalTxId(), globalTxId); assertEquals(sagaData.getTxEntityMap().size(), 3); assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED); @@ -627,7 +632,7 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); - SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getGlobalTxId(), globalTxId); assertEquals(sagaData.getTxEntityMap().size(), 3); assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED); @@ -700,7 +705,7 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); - SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getGlobalTxId(), globalTxId); assertEquals(sagaData.getTxEntityMap().size(), 3); assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMMITTED); @@ -769,7 +774,7 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); - SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertThat(eventList, is(sagaData.getEvents())); system.stop(saga); @@ -823,7 +828,7 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); - SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getGlobalTxId(), globalTxId); assertEquals(sagaData.getTxEntityMap().size(), 3); sagaData.getTxEntityMap().forEach((k, v) -> { @@ -886,7 +891,7 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); - SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getGlobalTxId(), globalTxId); assertEquals(sagaData.getTxEntityMap().size(), 3); sagaData.getTxEntityMap().forEach((k, v) -> { @@ -948,7 +953,7 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); - SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getGlobalTxId(), globalTxId); assertEquals(sagaData.getTxEntityMap().size(), 3); assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED); diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java index 69d2870..d17de0e 100644 --- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java +++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertNotNull; import akka.actor.ActorSystem; import java.util.UUID; +import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; import org.apache.servicecomb.pack.alpha.fsm.sink.SagaActorEventSender; import org.apache.servicecomb.pack.alpha.fsm.model.SagaData; import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension; @@ -52,6 +53,9 @@ public class SagaIntegrationTest { @Autowired SagaActorEventSender sagaActorEventSender; + @Autowired + MetricsService metricsService; + @BeforeClass public static void setup(){ SagaDataExtension.autoCleanSagaDataMap=false; @@ -77,6 +81,10 @@ public class SagaIntegrationTest { assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMMITTED); assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMMITTED); assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMMITTED); + assertEquals(metricsService.metrics().getActorReceived(),8); + assertEquals(metricsService.metrics().getActorAccepted(),8); + assertEquals(metricsService.metrics().getSagaBeginCounter(),1); + assertEquals(metricsService.metrics().getSagaEndCounter(),1); } @Test
