This is an automated email from the ASF dual-hosted git repository. zhanglei pushed a commit to branch SCB-1321 in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 89f652b030f39dec13d32af6d375565e16c3c5ee Author: Lei Zhang <[email protected]> AuthorDate: Wed Jul 10 23:45:10 2019 +0800 SCB-1321 Remove EventBus between gRPC and Akka --- .../pack/alpha/fsm/FsmAutoConfiguration.java | 12 ++------ ...onsumer.java => SagaEventActorEventSender.java} | 17 +++++------- .../pack/alpha/fsm/SagaIntegrationTest.java | 32 +++++++++++----------- .../servicecomb/pack/alpha/server/AlphaConfig.java | 5 ++-- .../alpha/server/fsm/GrpcSagaEventService.java | 10 +++---- 5 files changed, 34 insertions(+), 42 deletions(-) diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java index 5371159..922e40b 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java @@ -20,11 +20,10 @@ package org.apache.servicecomb.pack.alpha.fsm; import static org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SpringAkkaExtension.SPRING_EXTENSION_PROVIDER; import akka.actor.ActorSystem; -import com.google.common.eventbus.EventBus; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import java.util.Map; -import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventConsumer; +import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender; import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.AkkaConfigPropertyAdapter; import org.apache.servicecomb.pack.alpha.fsm.spring.integration.eventbus.EventSubscribeBeanPostProcessor; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -50,14 +49,9 @@ public class FsmAutoConfiguration { return ConfigFactory.parseMap(converted).withFallback(ConfigFactory.defaultReference(applicationContext.getClassLoader())); } - @Bean(name = "sagaEventBus") - public EventBus sagaEventBus() { - return new EventBus(); - } - @Bean - public SagaEventConsumer sagaEventConsumer(){ - return new SagaEventConsumer(); + public SagaEventActorEventSender sagaEventConsumer(){ + return new SagaEventActorEventSender(); } @Bean diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java similarity index 85% rename from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java rename to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java index 758880b..84d7914 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java @@ -19,31 +19,28 @@ package org.apache.servicecomb.pack.alpha.fsm.event.consumer; import akka.actor.ActorRef; import akka.actor.ActorSystem; -import com.google.common.eventbus.Subscribe; import java.lang.invoke.MethodHandles; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.servicecomb.pack.alpha.fsm.SagaActor; import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; -public class SagaEventConsumer { +@Component +public class SagaEventActorEventSender { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @Autowired ActorSystem system; - private Map<String,ActorRef> sagaCache = new HashMap<>(); + private Map<String,ActorRef> sagaCache = new ConcurrentHashMap<>(); - /** - * Receive fsm message - * */ - @Subscribe - public void receiveSagaEvent(BaseEvent event) { + public void send(BaseEvent event) { if(LOG.isDebugEnabled()){ - LOG.debug("receive {} ", event.toString()); + LOG.debug("send {} ", event.toString()); } try{ ActorRef saga; diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java index c1a61a6..82ae48a 100644 --- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java +++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertNotNull; import akka.actor.ActorSystem; import com.google.common.eventbus.EventBus; import java.util.UUID; +import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender; import org.apache.servicecomb.pack.alpha.fsm.model.SagaData; import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension; import org.junit.Test; @@ -46,11 +47,10 @@ import org.springframework.test.context.junit4.SpringRunner; public class SagaIntegrationTest { @Autowired - @Qualifier("sagaEventBus") - EventBus sagaEventBus; - - @Autowired ActorSystem system; + + @Autowired + SagaEventActorEventSender sagaEventActorEventSender; @Test public void successfulTest() { @@ -59,7 +59,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.successfulEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaEventBus.post(event); + sagaEventActorEventSender.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); @@ -79,7 +79,7 @@ public class SagaIntegrationTest { final String globalTxId = UUID.randomUUID().toString(); final String localTxId_1 = UUID.randomUUID().toString(); SagaEventSender.firstTxAbortedEvents(globalTxId, localTxId_1).stream().forEach( event -> { - sagaEventBus.post(event); + sagaEventActorEventSender.send(event); }); await().atMost(2, SECONDS).until(() -> { @@ -99,7 +99,7 @@ public class SagaIntegrationTest { final String localTxId_1 = UUID.randomUUID().toString(); final String localTxId_2 = UUID.randomUUID().toString(); SagaEventSender.middleTxAbortedEvents(globalTxId, localTxId_1, localTxId_2).stream().forEach( event -> { - sagaEventBus.post(event); + sagaEventActorEventSender.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); @@ -120,7 +120,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.lastTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaEventBus.post(event); + sagaEventActorEventSender.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); @@ -142,7 +142,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.sagaAbortedEventBeforeTxComponsitedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaEventBus.post(event); + sagaEventActorEventSender.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); @@ -164,7 +164,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaEventBus.post(event); + sagaEventActorEventSender.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); @@ -186,7 +186,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.sagaAbortedEventAfterAllTxEndedsEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaEventBus.post(event); + sagaEventActorEventSender.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); @@ -208,7 +208,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.omegaSendSagaTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaEventBus.post(event); + sagaEventActorEventSender.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); @@ -231,7 +231,7 @@ public class SagaIntegrationTest { final String localTxId_3 = UUID.randomUUID().toString(); final int timeout = 5; // second SagaEventSender.sagaActorTriggerTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3, timeout).stream().forEach( event -> { - sagaEventBus.post(event); + sagaEventActorEventSender.send(event); }); await().atMost(timeout + 2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); @@ -253,7 +253,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.successfulWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaEventBus.post(event); + sagaEventActorEventSender.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); @@ -275,7 +275,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.successfulWithTxConcurrentCrossEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaEventBus.post(event); + sagaEventActorEventSender.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); @@ -297,7 +297,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.lastTxAbortedEventWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaEventBus.post(event); + sagaEventActorEventSender.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java index e6fa86a..17589e9 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java @@ -29,6 +29,7 @@ import javax.annotation.PreDestroy; import com.google.common.eventbus.EventBus; import org.apache.servicecomb.pack.alpha.core.*; +import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender; import org.apache.servicecomb.pack.alpha.server.fsm.GrpcSagaEventService; import org.apache.servicecomb.pack.alpha.server.tcc.GrpcTccEventService; import org.apache.servicecomb.pack.alpha.server.tcc.callback.TccPendingTaskRunner; @@ -169,9 +170,9 @@ public class AlphaConfig { @ConditionalOnProperty(name= "alpha.feature.akka.enabled", havingValue = "true") ServerStartable serverStartableMy(GrpcServerConfig serverConfig, Map<String, Map<String, OmegaCallback>> omegaCallbacks, GrpcTccEventService grpcTccEventService, - TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus, @Qualifier("sagaEventBus") EventBus sagaEventBus) throws IOException { + TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus, SagaEventActorEventSender sagaEventActorEventSender) throws IOException { ServerStartable bootstrap = new GrpcStartable(serverConfig, eventBus, - new GrpcSagaEventService(sagaEventBus, omegaCallbacks), grpcTccEventService); + new GrpcSagaEventService(sagaEventActorEventSender, omegaCallbacks), grpcTccEventService); new Thread(bootstrap::start).start(); tccPendingTaskRunner.start(); tccEventScanner.start(); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java index 99b26bf..3cfb931 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java @@ -19,7 +19,6 @@ package org.apache.servicecomb.pack.alpha.server.fsm; import static java.util.Collections.emptyMap; -import com.google.common.eventbus.EventBus; import io.grpc.stub.StreamObserver; import java.lang.invoke.MethodHandles; import java.util.Date; @@ -28,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap; import kamon.annotation.Trace; import org.apache.servicecomb.pack.alpha.core.OmegaCallback; import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender; import org.apache.servicecomb.pack.common.EventType; import org.apache.servicecomb.pack.contract.grpc.GrpcAck; import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand; @@ -43,11 +43,11 @@ public class GrpcSagaEventService extends TxEventServiceImplBase { private static final GrpcAck REJECT = GrpcAck.newBuilder().setAborted(true).build(); private final Map<String, Map<String, OmegaCallback>> omegaCallbacks; - private final EventBus sagaEventBus; + private final SagaEventActorEventSender sagaEventActorEventSender; - public GrpcSagaEventService(EventBus sagaEventBus, + public GrpcSagaEventService(SagaEventActorEventSender sagaEventActorEventSender, Map<String, Map<String, OmegaCallback>> omegaCallbacks) { - this.sagaEventBus = sagaEventBus; + this.sagaEventActorEventSender = sagaEventActorEventSender; this.omegaCallbacks = omegaCallbacks; } @@ -142,7 +142,7 @@ public class GrpcSagaEventService extends TxEventServiceImplBase { } if (event != null) { event.setCreateTime(new Date()); - this.sagaEventBus.post(event); + sagaEventActorEventSender.send(event); } responseObserver.onNext(ok ? ALLOW : REJECT); responseObserver.onCompleted();
