This is an automated email from the ASF dual-hosted git repository. zhanglei pushed a commit to branch SCB-1321 in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 9eaf13b3e7e7444ad18f28198941df905c177e4a Author: Lei Zhang <[email protected]> AuthorDate: Wed Jul 10 18:49:18 2019 +0800 SCB-1321 Optimize termination of SagaData cache for stress test --- .../spring/integration/akka/SagaDataExtension.java | 76 +++++++++++++++++++--- 1 file changed, 68 insertions(+), 8 deletions(-) diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java index f0d8f58..1a49527 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java @@ -20,12 +20,16 @@ package org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka; import akka.actor.AbstractExtensionId; import akka.actor.ExtendedActorSystem; import akka.actor.Extension; -import java.util.concurrent.ConcurrentSkipListMap; +import java.lang.invoke.MethodHandles; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.servicecomb.pack.alpha.fsm.model.SagaData; import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension.SagaDataExt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> { - + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static final SagaDataExtension SAGA_DATA_EXTENSION_PROVIDER = new SagaDataExtension(); @Override @@ -34,22 +38,78 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> { } public static class SagaDataExt implements Extension { - private ConcurrentSkipListMap<String, SagaData> sagaDataMap = new ConcurrentSkipListMap(); + private final ConcurrentLinkedQueue<String> globalTxIds = new ConcurrentLinkedQueue<>(); + private final ConcurrentHashMap<String, SagaData> sagaDataMap = new ConcurrentHashMap(); + private String lastGlobalTxId; + private CleanMemForTest cleanMemForTest = new CleanMemForTest(globalTxIds,sagaDataMap); + + public SagaDataExt() { + // Just to avoid the overflow of the OldGen for stress testing + // Delete after SagaData persistence + new Thread(cleanMemForTest).start(); + } - public void putSagaData(String globalTxId, SagaData sagaData){ + public void putSagaData(String globalTxId, SagaData sagaData) { + if(!globalTxIds.contains(globalTxId)){ + globalTxIds.add(globalTxId); + } sagaDataMap.put(globalTxId, sagaData); } - public SagaData getSagaData(String globalTxId){ + public void stopSagaData(String globalTxId, SagaData sagaData) { + // TODO save SagaDate to database and clean sagaDataMap + this.putSagaData(globalTxId, sagaData); + lastGlobalTxId = globalTxId; + } + + public SagaData getSagaData(String globalTxId) { + // TODO If globalTxId does not exist in sagaDataMap then + // load from the database return sagaDataMap.get(globalTxId); } - public void clearSagaData(){ + // Only test + public void clearSagaData() { + globalTxIds.clear(); sagaDataMap.clear(); } - public SagaData getLastSagaData(){ - return sagaDataMap.lastEntry().getValue(); + public SagaData getLastSagaData() { + return sagaDataMap.get(lastGlobalTxId); + } + } + + static class CleanMemForTest implements Runnable { + final ConcurrentLinkedQueue<String> globalTxIds; + final ConcurrentHashMap<String, SagaData> sagaDataMap; + + public CleanMemForTest(ConcurrentLinkedQueue<String> globalTxIds, ConcurrentHashMap<String, SagaData> sagaDataMap) { + this.globalTxIds = globalTxIds; + this.sagaDataMap = sagaDataMap; + } + + @Override + public void run() { + while (true){ + try{ + if(!globalTxIds.isEmpty()){ + int cache_size = globalTxIds.size()-5000; + while(cache_size>0){ + sagaDataMap.remove(globalTxIds.poll()); + cache_size--; + } + } + }catch (Exception e){ + LOG.error(e.getMessage(),e); + }finally { + LOG.info("SagaData limit cache 5000, free memory globalTxIds {}, sagaDataMap size {}",globalTxIds.size(),sagaDataMap.size()); + try { + Thread.sleep(60000); + } catch (InterruptedException e) { + LOG.error(e.getMessage(),e); + } + } + } } } }
