This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit f00727e48a1c27244de9f72aaae6ead2c377fc38 Author: CMonkey <[email protected]> AuthorDate: Tue Aug 13 11:47:39 2019 +0800 SCB-1418 add alpha-fsm-channel-kafka module --- alpha/alpha-fsm-channel-kafka/README.md | 17 +++ alpha/alpha-fsm-channel-kafka/pom.xml | 125 +++++++++++++++++++++ .../kafka/KafkaChannelAutoConfiguration.java | 110 ++++++++++++++++++ .../fsm/channel/kafka/KafkaMessageListener.java | 49 ++++++++ .../fsm/channel/kafka/KafkaMessagePublisher.java | 64 +++++++++++ .../src/main/resources/META-INF/spring.factories | 17 +++ .../channel/kafka/test/KafkaActorEventSink.java | 10 ++ .../fsm/channel/kafka/test/KafkaApplication.java | 24 ++++ .../fsm/channel/kafka/test/KafkaChannelTest.java | 72 ++++++++++++ .../src/test/resources/log4j2.xml | 30 +++++ 10 files changed, 518 insertions(+) diff --git a/alpha/alpha-fsm-channel-kafka/README.md b/alpha/alpha-fsm-channel-kafka/README.md new file mode 100644 index 0000000..d48070c --- /dev/null +++ b/alpha/alpha-fsm-channel-kafka/README.md @@ -0,0 +1,17 @@ +# FSM kafka channel +## Enabled Saga State Machine Module + +Using `alpha.feature.akka.enabled=true` launch Alpha and Omega Side +Using `alpha.feature.akka.channel.type=kafka` launch Alpha and Omega Side + +```properties +alpha.feature.akka.enabled=true +alpha.feature.akka.channel.type=kafka +``` + +setting spring boot kafka +``` +spring.kafka.bootstrap-servers=kafka bootstrap_servers +spring.kafka.consumer.group-id=kafka consumer group id +alpha.feature.akka.channel.kafka.topic= kafka topic name +``` \ No newline at end of file diff --git a/alpha/alpha-fsm-channel-kafka/pom.xml b/alpha/alpha-fsm-channel-kafka/pom.xml new file mode 100644 index 0000000..b18babd --- /dev/null +++ b/alpha/alpha-fsm-channel-kafka/pom.xml @@ -0,0 +1,125 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>alpha</artifactId> + <groupId>org.apache.servicecomb.pack</groupId> + <version>0.5.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>alpha-fsm-channel-kafka</artifactId> + <name>Pack::Alpha::Fsm::channel::kafka</name> + + <properties> + <leveldbjni-all.version>1.8</leveldbjni-all.version> + <akka-persistence-redis.version>0.4.0</akka-persistence-redis.version> + </properties> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-dependencies</artifactId> + <version>${spring.boot.version}</version> + <type>pom</type> + <scope>import</scope> + </dependency> + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-persistence_2.12</artifactId> + <version>${akka.version}</version> + </dependency> + </dependencies> + </dependencyManagement> + + <dependencies> + <!-- spring boot --> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-autoconfigure</artifactId> + </dependency> + <dependency> + <groupId>org.apache.servicecomb.pack</groupId> + <artifactId>pack-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.servicecomb.pack</groupId> + <artifactId>alpha-core</artifactId> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-log4j2</artifactId> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <scope>test</scope> + </dependency> + + <!-- For testing the artifacts scope are test--> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-test</artifactId> + <exclusions> + <exclusion> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + <dependency> + <groupId>org.springframework.kafka</groupId> + <artifactId>spring-kafka</artifactId> + </dependency> + <dependency> + <groupId>org.springframework.kafka</groupId> + <artifactId>spring-kafka-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + <version>2.11.12</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>2.11.12</version> + <scope>test</scope> + </dependency> +</dependencies> + +</project> diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java new file mode 100644 index 0000000..eca7432 --- /dev/null +++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java @@ -0,0 +1,110 @@ +package org.apache.servicecomb.pack.alpha.fsm.channel.kafka; + +import com.google.common.collect.Maps; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.servicecomb.pack.alpha.core.NodeStatus; +import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Lazy; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.*; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.util.Map; + +@Configuration +@ConditionalOnClass(KafkaProperties.class) +@ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "kafka") +public class KafkaChannelAutoConfiguration { + + private static final Logger logger = LoggerFactory.getLogger(KafkaChannelAutoConfiguration.class); + + @Value("${alpha.feature.akka.channel.kafka.topic:servicecomb-pack-actor-event}") + private String topic; + + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrap_servers; + + @Value("${spring.kafka.consumer.group-id:servicecomb-pack}") + private String groupId; + + @Value("${spring.kafka.consumer.properties.spring.json.trusted.packages:org.apache.servicecomb.pack.alpha.core.fsm.event,org.apache.servicecomb.pack.alpha.core.fsm.event.base,}org.apache.servicecomb.pack.alpha.core.fsm.event.internal") + private String trusted_packages; + + @Bean + @ConditionalOnMissingBean + public ProducerFactory<String, Object> producerFactory(){ + Map<String, Object> map = Maps.newHashMap(); + map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers); + map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + map.put(ProducerConfig.RETRIES_CONFIG, 0); + map.put(ProducerConfig.BATCH_SIZE_CONFIG, 16304); + map.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33354432); + + return new DefaultKafkaProducerFactory<>(map); + } + + @Bean + @ConditionalOnMissingBean + public KafkaTemplate<String, Object> kafkaTemplate(){ + return new KafkaTemplate<>(producerFactory()); + } + + @Bean + @ConditionalOnMissingBean + public ConsumerFactory<String, Object> consumerFactory(){ + Map<String, Object> map = Maps.newHashMap(); + + map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers); + map.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); + map.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); + map.put(JsonDeserializer.TRUSTED_PACKAGES, trusted_packages); + + if(logger.isDebugEnabled()){ + logger.debug("init consumerFactory properties = [{}]", map); + } + return new DefaultKafkaConsumerFactory<>(map); + } + + @Bean + @ConditionalOnMissingBean + public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory(){ + ConcurrentKafkaListenerContainerFactory<String,Object> concurrentKafkaListenerContainerFactory = + new ConcurrentKafkaListenerContainerFactory<>(); + concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory()); + concurrentKafkaListenerContainerFactory.getContainerProperties().setPollTimeout(1500L); + + return concurrentKafkaListenerContainerFactory; + } + @Bean + @ConditionalOnMissingBean + public KafkaMessagePublisher kafkaMessagePublisher(KafkaTemplate<String, Object> kafkaTemplate){ + return new KafkaMessagePublisher(topic, kafkaTemplate); + } + + @Bean + @ConditionalOnMissingBean + public KafkaMessageListener kafkaMessageListener(@Lazy @Qualifier("actorEventSink") ActorEventSink actorEventSink){ + return new KafkaMessageListener(actorEventSink); + } +} \ No newline at end of file diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessageListener.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessageListener.java new file mode 100644 index 0000000..fe6d535 --- /dev/null +++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessageListener.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.pack.alpha.fsm.channel.kafka; + +import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent; +import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.kafka.annotation.KafkaListener; + +public class KafkaMessageListener { + + private static final Logger logger = LoggerFactory.getLogger(KafkaMessageListener.class); + + private ActorEventSink actorEventSink; + + public KafkaMessageListener(ActorEventSink actorEventSink) { + this.actorEventSink = actorEventSink; + } + + @KafkaListener(topics = "${alpha.feature.akka.channel.kafka.topic:servicecomb-pack-actor-event}") + public void listener(BaseEvent baseEvent){ + if(logger.isDebugEnabled()){ + logger.debug("listener event = [{}]", baseEvent); + } + + try { + actorEventSink.send(baseEvent); + }catch (Exception e){ + logger.error("subscriber Exception = [{}]", e.getMessage(), e); + } + + + } +} \ No newline at end of file diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java new file mode 100644 index 0000000..4b1e511 --- /dev/null +++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.pack.alpha.fsm.channel.kafka; + +import org.apache.servicecomb.pack.alpha.core.fsm.channel.MessagePublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; + +public class KafkaMessagePublisher implements MessagePublisher { + + private static final Logger logger = LoggerFactory.getLogger(KafkaMessagePublisher.class); + + private String topic; + private KafkaTemplate<String, Object> kafkaTemplate; + + public KafkaMessagePublisher(String topic, KafkaTemplate<String, Object> kafkaTemplate) { + this.topic = topic; + this.kafkaTemplate = kafkaTemplate; + } + + @Override + public void publish(Object data) { + if(logger.isDebugEnabled()){ + logger.debug("send message [{}] to [{}]", data, topic); + } + ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaTemplate.send(topic, data); + + listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { + @Override + public void onFailure(Throwable throwable) { + + if(logger.isDebugEnabled()){ + logger.debug("send message failure [{}]", throwable.getMessage(), throwable); + } + } + + @Override + public void onSuccess(SendResult<String, Object> result) { + if(logger.isDebugEnabled()){ + logger.debug("send success result offset = [{}]", result.getRecordMetadata().offset()); + } + } + }); + } +} \ No newline at end of file diff --git a/alpha/alpha-fsm-channel-kafka/src/main/resources/META-INF/spring.factories b/alpha/alpha-fsm-channel-kafka/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000..9366e98 --- /dev/null +++ b/alpha/alpha-fsm-channel-kafka/src/main/resources/META-INF/spring.factories @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.apache.servicecomb.pack.alpha.fsm.channel.kafka.KafkaChannelAutoConfiguration diff --git a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaActorEventSink.java b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaActorEventSink.java new file mode 100644 index 0000000..cb08d43 --- /dev/null +++ b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaActorEventSink.java @@ -0,0 +1,10 @@ +package org.apache.servicecomb.pack.alpha.fsm.channel.kafka.test; + +import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent; +import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink; + +public class KafkaActorEventSink implements ActorEventSink { + @Override + public void send(BaseEvent event) throws Exception { + } +} diff --git a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaApplication.java b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaApplication.java new file mode 100644 index 0000000..41d1b61 --- /dev/null +++ b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaApplication.java @@ -0,0 +1,24 @@ +package org.apache.servicecomb.pack.alpha.fsm.channel.kafka.test; + +import org.apache.servicecomb.pack.alpha.core.NodeStatus; +import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; + +@SpringBootApplication +public class KafkaApplication { + public static void main(String[] args) { + SpringApplication.run(KafkaApplication.class, args); + } + + @Bean(name = "actorEventSink") + public ActorEventSink actorEventSink(){ + return new KafkaActorEventSink(); + } + + @Bean(name = "nodeStatus") + public NodeStatus nodeStatus(){ + return new NodeStatus(NodeStatus.TypeEnum.MASTER); + } +} diff --git a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaChannelTest.java b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaChannelTest.java new file mode 100644 index 0000000..55e6fb4 --- /dev/null +++ b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaChannelTest.java @@ -0,0 +1,72 @@ +package org.apache.servicecomb.pack.alpha.fsm.channel.kafka.test; + +import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaEndedEvent; +import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaStartedEvent; +import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent; +import org.apache.servicecomb.pack.alpha.core.fsm.event.TxStartedEvent; +import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent; +import org.apache.servicecomb.pack.alpha.fsm.channel.kafka.KafkaMessagePublisher; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.context.junit4.SpringRunner; + +import java.util.*; +import java.util.concurrent.TimeUnit; + + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = KafkaApplication.class, + properties = { + "alpha.feature.akka.enabled=true", + "alpha.feature.akka.channel.type=kafka", + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.kafka.consumer.group-id=messageListener" + } +) +@EmbeddedKafka +public class KafkaChannelTest { + @Autowired + private EmbeddedKafkaBroker embeddedKafkaBroker; + + @Autowired + private KafkaMessagePublisher kafkaMessagePublisher; + + @Before + public void setup(){ + } + @Test + public void testProducer(){ + + String globalTxId = UUID.randomUUID().toString().replaceAll("-", ""); + String localTxId_1 = UUID.randomUUID().toString().replaceAll("-", ""); + String localTxId_2 = UUID.randomUUID().toString().replaceAll("-", ""); + String localTxId_3 = UUID.randomUUID().toString().replaceAll("-", ""); + + buildData(globalTxId, localTxId_1, localTxId_2, localTxId_3).forEach(baseEvent -> kafkaMessagePublisher.publish(baseEvent)); + + try { + TimeUnit.SECONDS.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + + private List<BaseEvent> buildData(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){ + List<BaseEvent> sagaEvents = new ArrayList<>(); + sagaEvents.add(SagaStartedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(SagaEndedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); + return sagaEvents; + } +} diff --git a/alpha/alpha-fsm-channel-kafka/src/test/resources/log4j2.xml b/alpha/alpha-fsm-channel-kafka/src/test/resources/log4j2.xml new file mode 100644 index 0000000..8c2def9 --- /dev/null +++ b/alpha/alpha-fsm-channel-kafka/src/test/resources/log4j2.xml @@ -0,0 +1,30 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<Configuration status="WARN"> + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> + </Console> + </Appenders> + <Loggers> + <Root level="debug"> + <AppenderRef ref="Console"/> + </Root> + </Loggers> +</Configuration>
