This is an automated email from the ASF dual-hosted git repository. zhanglei pushed a commit to branch SCB-1321 in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit ac55eec256a85f1d6adabb6a8a08e6ffb0c1613b Author: Lei Zhang <[email protected]> AuthorDate: Thu Jul 11 14:44:19 2019 +0800 SCB-1321 Optimize alpha throughput --- .../pack/alpha/fsm/FsmAutoConfiguration.java | 46 ++++++++++++-- .../fsm/channel/ActiveMQActorEventChannel.java | 47 +++++++++++++++ .../pack/alpha/fsm/channel/ActorEventChannel.java | 24 ++++++++ .../alpha/fsm/channel/KafkaActorEventChannel.java | 43 +++++++++++++ .../alpha/fsm/channel/MemoryActorEventChannel.java | 70 ++++++++++++++++++++++ .../alpha/fsm/channel/RedisActorEventChannel.java | 47 +++++++++++++++ .../servicecomb/pack/alpha/fsm/model/SagaData.java | 3 +- .../pack/alpha/fsm/sink/ActorEventSink.java | 25 ++++++++ .../SagaActorEventSender.java} | 47 ++++++++------- .../spring/integration/akka/SagaDataExtension.java | 33 +++++----- .../servicecomb/pack/alpha/fsm/SagaActorTest.java | 1 + .../pack/alpha/fsm/SagaIntegrationTest.java | 37 +++++++----- .../servicecomb/pack/alpha/server/AlphaConfig.java | 8 +-- .../pack/alpha/server/AlphaEventController.java | 1 - .../alpha/server/fsm/GrpcSagaEventService.java | 10 ++-- .../src/main/resources/application.yaml | 6 ++ .../alpha/server/fsm/AlphaIntegrationFsmTest.java | 3 +- 17 files changed, 379 insertions(+), 72 deletions(-) diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java index 922e40b..fcf5cec 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java @@ -23,9 +23,17 @@ import akka.actor.ActorSystem; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import java.util.Map; -import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender; +import org.apache.servicecomb.pack.alpha.fsm.channel.ActiveMQActorEventChannel; +import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink; +import org.apache.servicecomb.pack.alpha.fsm.channel.ActorEventChannel; +import org.apache.servicecomb.pack.alpha.fsm.channel.KafkaActorEventChannel; +import org.apache.servicecomb.pack.alpha.fsm.channel.MemoryActorEventChannel; +import org.apache.servicecomb.pack.alpha.fsm.channel.RedisActorEventChannel; +import org.apache.servicecomb.pack.alpha.fsm.sink.SagaActorEventSender; import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.AkkaConfigPropertyAdapter; import org.apache.servicecomb.pack.alpha.fsm.spring.integration.eventbus.EventSubscribeBeanPostProcessor; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; @@ -36,6 +44,9 @@ import org.springframework.core.env.ConfigurableEnvironment; @ConditionalOnProperty(value = {"alpha.feature.akka.enabled"}) public class FsmAutoConfiguration { + @Value("${alpha.feature.akka.channel.memory.size:-1}") + int memoryEventChannelMemorySize; + @Bean public ActorSystem actorSystem(ConfigurableApplicationContext applicationContext, ConfigurableEnvironment environment) { ActorSystem system = ActorSystem.create("alpha-akka", akkaConfiguration(applicationContext,environment)); @@ -50,13 +61,38 @@ public class FsmAutoConfiguration { } @Bean - public SagaEventActorEventSender sagaEventConsumer(){ - return new SagaEventActorEventSender(); + public EventSubscribeBeanPostProcessor eventSubscribeBeanPostProcessor(){ + return new EventSubscribeBeanPostProcessor(); } @Bean - public EventSubscribeBeanPostProcessor eventSubscribeBeanPostProcessor(){ - return new EventSubscribeBeanPostProcessor(); + public ActorEventSink actorEventSink(){ + return new SagaActorEventSender(); + } + + @Bean + @ConditionalOnMissingBean(ActorEventChannel.class) + @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "memory", matchIfMissing = true) + public ActorEventChannel memoryEventChannel(ActorEventSink actorEventSink){ + return new MemoryActorEventChannel(actorEventSink, memoryEventChannelMemorySize); + } + + @Bean + @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "activemq") + public ActorEventChannel activeMqEventChannel(ActorEventSink actorEventSink){ + return new ActiveMQActorEventChannel(actorEventSink); + } + + @Bean + @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "kafka") + public ActorEventChannel kafkaEventChannel(ActorEventSink actorEventSink){ + return new KafkaActorEventChannel(actorEventSink); + } + + @Bean + @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "redis") + public ActorEventChannel redisEventChannel(ActorEventSink actorEventSink){ + return new RedisActorEventChannel(actorEventSink); } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java new file mode 100644 index 0000000..515f29c --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.pack.alpha.fsm.channel; + +import java.lang.invoke.MethodHandles; +import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; +import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Queue + * */ + +public class ActiveMQActorEventChannel implements ActorEventChannel { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final ActorEventSink actorEventSink; + + public ActiveMQActorEventChannel( + ActorEventSink actorEventSink) { + this.actorEventSink = actorEventSink; + } + + @Override + public void send(BaseEvent event){ + try{ + throw new UnsupportedOperationException(); + }catch (Exception e){ + throw new RuntimeException(e); + } + } +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActorEventChannel.java new file mode 100644 index 0000000..f026d91 --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActorEventChannel.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.pack.alpha.fsm.channel; + +import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; + +public interface ActorEventChannel { + void send(BaseEvent event); +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java new file mode 100644 index 0000000..7539069 --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.pack.alpha.fsm.channel; + +import java.lang.invoke.MethodHandles; +import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; +import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaActorEventChannel implements ActorEventChannel { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final ActorEventSink actorEventSink; + + public KafkaActorEventChannel( + ActorEventSink actorEventSink) { + this.actorEventSink = actorEventSink; + } + + @Override + public void send(BaseEvent event){ + try{ + throw new UnsupportedOperationException(); + }catch (Exception e){ + throw new RuntimeException(e); + } + } +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java new file mode 100644 index 0000000..1af2432 --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.pack.alpha.fsm.channel; + +import java.lang.invoke.MethodHandles; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; +import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MemoryActorEventChannel implements ActorEventChannel { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final ActorEventSink actorEventSink; + private final LinkedBlockingQueue<BaseEvent> eventQueue; + private int size; + + public MemoryActorEventChannel(ActorEventSink actorEventSink, int size) { + this.size = size > 0 ? size : Integer.MAX_VALUE; + eventQueue = new LinkedBlockingQueue(this.size); + this.actorEventSink = actorEventSink; + new Thread(new EventConsumer(),"MemoryActorEventChannel").start(); + } + + @Override + public void send(BaseEvent event){ + try{ + eventQueue.put(event); + }catch (Exception e){ + throw new RuntimeException(e); + } + } + + class EventConsumer implements Runnable { + + @Override + public void run() { + while (true) { + try { + BaseEvent event = eventQueue.peek(); + if (event != null) { + actorEventSink.send(event); + eventQueue.poll(); + } else { + Thread.sleep(10); + } + } catch (Exception ex) { + LOG.error(ex.getMessage(), ex); + } + } + } + } +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java new file mode 100644 index 0000000..f055eec --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.pack.alpha.fsm.channel; + +import java.lang.invoke.MethodHandles; +import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; +import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Pub/Sub + * */ + +public class RedisActorEventChannel implements ActorEventChannel { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final ActorEventSink actorEventSink; + + public RedisActorEventChannel( + ActorEventSink actorEventSink) { + this.actorEventSink = actorEventSink; + } + + @Override + public void send(BaseEvent event){ + try{ + throw new UnsupportedOperationException(); + }catch (Exception e){ + throw new RuntimeException(e); + } + } +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java index 59c1e4e..e9a5f60 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.servicecomb.pack.alpha.core.fsm.PackSagaEvent; import org.apache.servicecomb.pack.alpha.fsm.SagaActorState; @@ -38,7 +39,7 @@ public class SagaData implements Serializable { private boolean terminated; private SagaActorState lastState; private AtomicLong compensationRunningCounter = new AtomicLong(); - private Map<String,TxEntity> txEntityMap = new HashMap<>(); + private Map<String,TxEntity> txEntityMap = new ConcurrentHashMap<>(); private List<BaseEvent> events = new LinkedList<>(); public String getServiceName() { diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/ActorEventSink.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/ActorEventSink.java new file mode 100644 index 0000000..73ba220 --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/ActorEventSink.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.pack.alpha.fsm.sink; + +import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; + +public interface ActorEventSink { + + void send(BaseEvent event) throws Exception; +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java similarity index 52% rename from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java rename to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java index 84d7914..cdc0828 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java @@ -15,47 +15,54 @@ * limitations under the License. */ -package org.apache.servicecomb.pack.alpha.fsm.event.consumer; +package org.apache.servicecomb.pack.alpha.fsm.sink; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.ActorSystem; +import akka.util.Timeout; import java.lang.invoke.MethodHandles; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import org.apache.servicecomb.pack.alpha.fsm.SagaActor; +import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent; import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +public class SagaActorEventSender implements ActorEventSink { -@Component -public class SagaEventActorEventSender { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @Autowired ActorSystem system; - private Map<String,ActorRef> sagaCache = new ConcurrentHashMap<>(); + private static final Timeout lookupTimeout = new Timeout(Duration.create(1, TimeUnit.SECONDS)); public void send(BaseEvent event) { - if(LOG.isDebugEnabled()){ - LOG.debug("send {} ", event.toString()); - } try{ - ActorRef saga; - if(sagaCache.containsKey(event.getGlobalTxId())){ - saga = sagaCache.get(event.getGlobalTxId()); - }else{ - saga = system.actorOf(SagaActor.props(event.getGlobalTxId()), event.getGlobalTxId()); - sagaCache.put(event.getGlobalTxId(), saga); + if (LOG.isDebugEnabled()) { + LOG.debug("send {} ", event.toString()); + } + if (LOG.isDebugEnabled()) { + LOG.debug("send {} ", event.toString()); } - saga.tell(event, ActorRef.noSender()); - if(LOG.isDebugEnabled()){ - LOG.debug("tell {} to {}", event.toString(),saga); + if (event instanceof SagaStartedEvent) { + final ActorRef saga = system + .actorOf(SagaActor.props(event.getGlobalTxId()), event.getGlobalTxId()); + saga.tell(event, ActorRef.noSender()); + } else { + ActorSelection actorSelection = system + .actorSelection("/user/" + event.getGlobalTxId()); + final Future<ActorRef> actorRefFuture = actorSelection.resolveOne(lookupTimeout); + final ActorRef saga = Await.result(actorRefFuture, lookupTimeout.duration()); + saga.tell(event, ActorRef.noSender()); } }catch (Exception ex){ - throw ex; + throw new RuntimeException(ex); } } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java index c1690c6..ae8d43d 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static final SagaDataExtension SAGA_DATA_EXTENSION_PROVIDER = new SagaDataExtension(); + public static boolean autoCleanSagaDataMap = true; // Only for Test @Override public SagaDataExt createExtension(ExtendedActorSystem system) { @@ -38,22 +39,24 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> { } public static class SagaDataExt implements Extension { - private final ConcurrentLinkedQueue<String> globalTxIds = new ConcurrentLinkedQueue<>(); + //private final ConcurrentLinkedQueue<String> globalTxIds = new ConcurrentLinkedQueue<>(); private final ConcurrentHashMap<String, SagaData> sagaDataMap = new ConcurrentHashMap(); private String lastGlobalTxId; - private CleanMemForTest cleanMemForTest = new CleanMemForTest(globalTxIds,sagaDataMap); + private CleanMemForTest cleanMemForTest = new CleanMemForTest(sagaDataMap); public SagaDataExt() { // Just to avoid the overflow of the OldGen for stress testing // Delete after SagaData persistence - new Thread(cleanMemForTest).start(); + if(autoCleanSagaDataMap){ + new Thread(cleanMemForTest).start(); + } } public void putSagaData(String globalTxId, SagaData sagaData) { - if(!globalTxIds.contains(globalTxId)){ + //if(!globalTxIds.contains(globalTxId)){ lastGlobalTxId = globalTxId; - globalTxIds.add(globalTxId); - } + // globalTxIds.add(globalTxId); + //} sagaDataMap.put(globalTxId, sagaData); } @@ -71,7 +74,8 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> { // Only test public void clearSagaData() { - globalTxIds.clear(); + //globalTxIds.clear(); + lastGlobalTxId = null; sagaDataMap.clear(); } @@ -81,11 +85,9 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> { } static class CleanMemForTest implements Runnable { - final ConcurrentLinkedQueue<String> globalTxIds; final ConcurrentHashMap<String, SagaData> sagaDataMap; - public CleanMemForTest(ConcurrentLinkedQueue<String> globalTxIds, ConcurrentHashMap<String, SagaData> sagaDataMap) { - this.globalTxIds = globalTxIds; + public CleanMemForTest(ConcurrentHashMap<String, SagaData> sagaDataMap) { this.sagaDataMap = sagaDataMap; } @@ -93,19 +95,12 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> { public void run() { while (true){ try{ - if(!globalTxIds.isEmpty()){ - int cache_size = globalTxIds.size()-5000; - while(cache_size>0){ - sagaDataMap.remove(globalTxIds.poll()); - cache_size--; - } - } + sagaDataMap.clear(); }catch (Exception e){ LOG.error(e.getMessage(),e); }finally { - LOG.info("SagaData limit cache 5000, free memory globalTxIds {}, sagaDataMap size {}",globalTxIds.size(),sagaDataMap.size()); try { - Thread.sleep(60000); + Thread.sleep(10000); } catch (InterruptedException e) { LOG.error(e.getMessage(),e); } diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java index 1b4d84b..505d923 100644 --- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java +++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java @@ -67,6 +67,7 @@ public class SagaActorTest { @BeforeClass public static void setup() { + SagaDataExtension.autoCleanSagaDataMap=false; system = ActorSystem.create("SagaActorTest", ConfigFactory.parseMap(getPersistenceMemConfig())); } diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java index 82ae48a..69d2870 100644 --- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java +++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java @@ -23,15 +23,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import akka.actor.ActorSystem; -import com.google.common.eventbus.EventBus; import java.util.UUID; -import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender; +import org.apache.servicecomb.pack.alpha.fsm.sink.SagaActorEventSender; import org.apache.servicecomb.pack.alpha.fsm.model.SagaData; import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @@ -39,6 +38,7 @@ import org.springframework.test.context.junit4.SpringRunner; @SpringBootTest(classes = {SagaApplication.class}, properties = { "alpha.feature.akka.enabled=true", + "alpha.feature.akka.channel.type=memory", "akkaConfig.akka.persistence.journal.plugin=akka.persistence.journal.inmem", "akkaConfig.akka.persistence.journal.leveldb.dir=target/example/journal", "akkaConfig.akka.persistence.snapshot-store.plugin=akka.persistence.snapshot-store.local", @@ -50,7 +50,12 @@ public class SagaIntegrationTest { ActorSystem system; @Autowired - SagaEventActorEventSender sagaEventActorEventSender; + SagaActorEventSender sagaActorEventSender; + + @BeforeClass + public static void setup(){ + SagaDataExtension.autoCleanSagaDataMap=false; + } @Test public void successfulTest() { @@ -59,7 +64,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.successfulEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaEventActorEventSender.send(event); + sagaActorEventSender.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); @@ -79,7 +84,7 @@ public class SagaIntegrationTest { final String globalTxId = UUID.randomUUID().toString(); final String localTxId_1 = UUID.randomUUID().toString(); SagaEventSender.firstTxAbortedEvents(globalTxId, localTxId_1).stream().forEach( event -> { - sagaEventActorEventSender.send(event); + sagaActorEventSender.send(event); }); await().atMost(2, SECONDS).until(() -> { @@ -99,7 +104,7 @@ public class SagaIntegrationTest { final String localTxId_1 = UUID.randomUUID().toString(); final String localTxId_2 = UUID.randomUUID().toString(); SagaEventSender.middleTxAbortedEvents(globalTxId, localTxId_1, localTxId_2).stream().forEach( event -> { - sagaEventActorEventSender.send(event); + sagaActorEventSender.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); @@ -120,7 +125,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.lastTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaEventActorEventSender.send(event); + sagaActorEventSender.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); @@ -142,7 +147,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.sagaAbortedEventBeforeTxComponsitedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaEventActorEventSender.send(event); + sagaActorEventSender.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); @@ -164,7 +169,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaEventActorEventSender.send(event); + sagaActorEventSender.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); @@ -186,7 +191,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.sagaAbortedEventAfterAllTxEndedsEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaEventActorEventSender.send(event); + sagaActorEventSender.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); @@ -208,7 +213,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.omegaSendSagaTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaEventActorEventSender.send(event); + sagaActorEventSender.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); @@ -231,7 +236,7 @@ public class SagaIntegrationTest { final String localTxId_3 = UUID.randomUUID().toString(); final int timeout = 5; // second SagaEventSender.sagaActorTriggerTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3, timeout).stream().forEach( event -> { - sagaEventActorEventSender.send(event); + sagaActorEventSender.send(event); }); await().atMost(timeout + 2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); @@ -253,7 +258,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.successfulWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaEventActorEventSender.send(event); + sagaActorEventSender.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); @@ -275,7 +280,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.successfulWithTxConcurrentCrossEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaEventActorEventSender.send(event); + sagaActorEventSender.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); @@ -297,7 +302,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.lastTxAbortedEventWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaEventActorEventSender.send(event); + sagaActorEventSender.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java index 17589e9..d2e94b1 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java @@ -29,7 +29,7 @@ import javax.annotation.PreDestroy; import com.google.common.eventbus.EventBus; import org.apache.servicecomb.pack.alpha.core.*; -import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender; +import org.apache.servicecomb.pack.alpha.fsm.channel.ActorEventChannel; import org.apache.servicecomb.pack.alpha.server.fsm.GrpcSagaEventService; import org.apache.servicecomb.pack.alpha.server.tcc.GrpcTccEventService; import org.apache.servicecomb.pack.alpha.server.tcc.callback.TccPendingTaskRunner; @@ -168,11 +168,11 @@ public class AlphaConfig { @Bean @ConditionalOnProperty(name= "alpha.feature.akka.enabled", havingValue = "true") - ServerStartable serverStartableMy(GrpcServerConfig serverConfig, + ServerStartable serverStartableWithAkka(GrpcServerConfig serverConfig, Map<String, Map<String, OmegaCallback>> omegaCallbacks, GrpcTccEventService grpcTccEventService, - TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus, SagaEventActorEventSender sagaEventActorEventSender) throws IOException { + TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus, ActorEventChannel actorEventChannel) throws IOException { ServerStartable bootstrap = new GrpcStartable(serverConfig, eventBus, - new GrpcSagaEventService(sagaEventActorEventSender, omegaCallbacks), grpcTccEventService); + new GrpcSagaEventService(actorEventChannel, omegaCallbacks), grpcTccEventService); new Thread(bootstrap::start).start(); tccPendingTaskRunner.start(); tccEventScanner.start(); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaEventController.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaEventController.java index 2e6b8e0..b7a344a 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaEventController.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaEventController.java @@ -43,7 +43,6 @@ import kamon.annotation.Trace; @Controller @RequestMapping("/saga") @Profile("test") -@ConditionalOnProperty(name = "alpha.feature.akka.enabled", havingValue = "false", matchIfMissing = true) // Only export this Controller for test class AlphaEventController { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java index 3cfb931..dcf5cf3 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java @@ -27,7 +27,7 @@ import java.util.concurrent.ConcurrentHashMap; import kamon.annotation.Trace; import org.apache.servicecomb.pack.alpha.core.OmegaCallback; import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; -import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender; +import org.apache.servicecomb.pack.alpha.fsm.channel.ActorEventChannel; import org.apache.servicecomb.pack.common.EventType; import org.apache.servicecomb.pack.contract.grpc.GrpcAck; import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand; @@ -43,11 +43,11 @@ public class GrpcSagaEventService extends TxEventServiceImplBase { private static final GrpcAck REJECT = GrpcAck.newBuilder().setAborted(true).build(); private final Map<String, Map<String, OmegaCallback>> omegaCallbacks; - private final SagaEventActorEventSender sagaEventActorEventSender; + private final ActorEventChannel actorEventChannel; - public GrpcSagaEventService(SagaEventActorEventSender sagaEventActorEventSender, + public GrpcSagaEventService(ActorEventChannel actorEventChannel, Map<String, Map<String, OmegaCallback>> omegaCallbacks) { - this.sagaEventActorEventSender = sagaEventActorEventSender; + this.actorEventChannel = actorEventChannel; this.omegaCallbacks = omegaCallbacks; } @@ -142,7 +142,7 @@ public class GrpcSagaEventService extends TxEventServiceImplBase { } if (event != null) { event.setCreateTime(new Date()); - sagaEventActorEventSender.send(event); + actorEventChannel.send(event); } responseObserver.onNext(ok ? ALLOW : REJECT); responseObserver.onCompleted(); diff --git a/alpha/alpha-server/src/main/resources/application.yaml b/alpha/alpha-server/src/main/resources/application.yaml index 1aa7a68..fa1b35a 100644 --- a/alpha/alpha-server/src/main/resources/application.yaml +++ b/alpha/alpha-server/src/main/resources/application.yaml @@ -21,6 +21,11 @@ alpha: server: host: 0.0.0.0 port: 8080 + feature: + akka: + enabled: false + channel: + type: memory spring: datasource: @@ -45,6 +50,7 @@ eureka: metadataMap: servicecomb-alpha-server: ${alpha.server.host}:${alpha.server.port} + akkaConfig: akka.persistence.journal.plugin: akka.persistence.journal.inmem akka.persistence.journal.leveldb.dir: target/example/journal diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java index d44d728..d0902e5 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java @@ -68,7 +68,8 @@ public class AlphaIntegrationFsmTest { @BeforeClass public static void beforeClass() { - omegaEventSender.configClient(NettyChannelBuilder.forAddress("localhost", port).usePlaintext().build()); + omegaEventSender.configClient(NettyChannelBuilder.forAddress("0.0.0.0", port).usePlaintext().build()); + SagaDataExtension.autoCleanSagaDataMap=false; } @AfterClass
