This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit a5c3e3e16bafa25bdd7b0b73a23d970a2ab0d00c Author: maheshrajus <mahesh.somalar...@huawei.com> AuthorDate: Fri Aug 24 17:58:23 2018 +0530 [SCB-868] Add Kamon metrics to Alpha Server Add Kamon metrics to Alpha Server:Review comments fix & system metrics enabled Add Kamon metrics to Alpha Server:Review comments fix & System metrics enabled --- .../servicecomb/saga/alpha/core/EventScanner.java | 24 +++++- .../saga/alpha/server/AlphaApplication.java | 9 +++ .../saga/alpha/server/AlphaEventController.java | 6 ++ .../saga/alpha/server/GrpcTxEventEndpointImpl.java | 6 ++ .../src/main/resources/META-INF/aop.xml | 25 ++++++ .../src/main/resources/application.conf | 88 ++++++++++++++++++++++ alpha/pom.xml | 46 +++++++++++ saga-spring/pom.xml | 1 + 8 files changed, 201 insertions(+), 4 deletions(-) diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java index 298274c..0a15ad0 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java @@ -28,21 +28,32 @@ import java.lang.invoke.MethodHandles; import java.util.List; import java.util.concurrent.ScheduledExecutorService; +import kamon.annotation.EnableKamon; +import kamon.annotation.Trace; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@EnableKamon public class EventScanner implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final byte[] EMPTY_PAYLOAD = new byte[0]; private final ScheduledExecutorService scheduler; + private final TxEventRepository eventRepository; + private final CommandRepository commandRepository; + private final TxTimeoutRepository timeoutRepository; + private final OmegaCallback omegaCallback; + private final int eventPollingInterval; private long nextEndedEventId; + private long nextCompensatedEventId; public EventScanner(ScheduledExecutorService scheduler, @@ -81,6 +92,7 @@ public class EventScanner implements Runnable { MILLISECONDS); } + @Trace("findTimeoutEvents") private void findTimeoutEvents() { eventRepository.findTimeoutEvents() .forEach(event -> { @@ -93,6 +105,7 @@ public class EventScanner implements Runnable { timeoutRepository.markTimeoutAsDone(); } + @Trace("saveUncompensatedEventsToCommands") private void saveUncompensatedEventsToCommands() { eventRepository.findFirstUncompensatedEventByIdGreaterThan(nextEndedEventId, TxEndedEvent.name()) .forEach(event -> { @@ -102,6 +115,7 @@ public class EventScanner implements Runnable { }); } + @Trace("updateCompensationStatus") private void updateCompensatedCommands() { eventRepository.findFirstCompensatedEventByIdGreaterThan(nextCompensatedEventId) .ifPresent(event -> { @@ -111,6 +125,7 @@ public class EventScanner implements Runnable { }); } + @Trace("deleteDuplicateSagaEndedEvents") private void deleteDuplicateSagaEndedEvents() { try { eventRepository.deleteDuplicateEvents(SagaEndedEvent.name()); @@ -128,6 +143,7 @@ public class EventScanner implements Runnable { markSagaEnded(event); } + @Trace("abortTimeoutEvents") private void abortTimeoutEvents() { timeoutRepository.findFirstTimeout().forEach(timeout -> { LOG.info("Found timeout event {} to abort", timeout); @@ -141,6 +157,7 @@ public class EventScanner implements Runnable { }); } + @Trace("updateTransactionStatus") private void updateTransactionStatus() { eventRepository.findFirstAbortedGlobalTransaction().ifPresent(this::markGlobalTxEndWithEvents); } @@ -184,6 +201,7 @@ public class EventScanner implements Runnable { EMPTY_PAYLOAD); } + @Trace("compensate") private void compensate() { commandRepository.findFirstCommandToCompensate() .forEach(command -> { @@ -204,8 +222,7 @@ public class EventScanner implements Runnable { command.parentTxId(), TxStartedEvent.name(), command.compensationMethod(), - command.payloads() - ); + command.payloads()); } private TxTimeout txTimeoutOf(TxEvent event) { @@ -218,7 +235,6 @@ public class EventScanner implements Runnable { event.parentTxId(), event.type(), event.expiryTime(), - NEW.name() - ); + NEW.name()); } } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaApplication.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaApplication.java index 6967872..72bf5fb 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaApplication.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaApplication.java @@ -17,12 +17,21 @@ package org.apache.servicecomb.saga.alpha.server; +import javax.annotation.PreDestroy; + +import kamon.Kamon; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class AlphaApplication { public static void main(String[] args) { + Kamon.start(); SpringApplication.run(AlphaApplication.class, args); } + + @PreDestroy + void shutdown() { + Kamon.shutdown(); + } } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaEventController.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaEventController.java index b85cfbc..56167e0 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaEventController.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaEventController.java @@ -34,6 +34,10 @@ import org.springframework.web.bind.annotation.RequestMapping; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; +import kamon.annotation.EnableKamon; +import kamon.annotation.Trace; + +@EnableKamon @Controller @RequestMapping("/") class AlphaEventController { @@ -45,6 +49,7 @@ class AlphaEventController { this.eventRepository = eventRepository; } + @Trace("getEvents") @GetMapping(value = "/events") ResponseEntity<Collection<TxEventVo>> events() { LOG.info("Get the events request"); @@ -57,6 +62,7 @@ class AlphaEventController { return ResponseEntity.ok(eventVos); } + @Trace("deleteEvents") @DeleteMapping("/events") ResponseEntity<String> clear() { eventRepository.deleteAll(); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java index 1146819..04e017d 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java @@ -26,6 +26,8 @@ import java.util.Date; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import kamon.annotation.EnableKamon; +import kamon.annotation.Trace; import org.apache.servicecomb.saga.alpha.core.OmegaCallback; import org.apache.servicecomb.saga.alpha.core.TxConsistentService; import org.apache.servicecomb.saga.alpha.core.TxEvent; @@ -37,6 +39,7 @@ import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEvent import io.grpc.stub.StreamObserver; +@EnableKamon class GrpcTxEventEndpointImpl extends TxEventServiceImplBase { private static final GrpcAck ALLOW = GrpcAck.newBuilder().setAborted(false).build(); @@ -53,6 +56,7 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase { } @Override + @Trace("alphaConnected") public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcCompensateCommand> responseObserver) { omegaCallbacks .computeIfAbsent(request.getServiceName(), key -> new ConcurrentHashMap<>()) @@ -61,6 +65,7 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase { // TODO: 2018/1/5 connect is async and disconnect is sync, meaning callback may not be registered on disconnected @Override + @Trace("alphaDisconnected") public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) { OmegaCallback callback = omegaCallbacks.getOrDefault(request.getServiceName(), emptyMap()) .remove(request.getInstanceId()); @@ -74,6 +79,7 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase { } @Override + @Trace("ontransactionEvent") public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObserver) { boolean ok = txConsistentService.handle(new TxEvent( message.getServiceName(), diff --git a/alpha/alpha-server/src/main/resources/META-INF/aop.xml b/alpha/alpha-server/src/main/resources/META-INF/aop.xml new file mode 100644 index 0000000..c481036 --- /dev/null +++ b/alpha/alpha-server/src/main/resources/META-INF/aop.xml @@ -0,0 +1,25 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd"> + +<aspectj> + <weaver options="-Xlint:ignore"> + <include within="org.apache.servicecomb.saga..*"/> + <exclude within="org.aspectj.*"/> + </weaver> +</aspectj> diff --git a/alpha/alpha-server/src/main/resources/application.conf b/alpha/alpha-server/src/main/resources/application.conf new file mode 100644 index 0000000..993979e --- /dev/null +++ b/alpha/alpha-server/src/main/resources/application.conf @@ -0,0 +1,88 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +############################### +# Kamon related configuration # +############################### + +kamon { + + metric { + filters { + trace.includes = ["**"] + trace-segment.includes = ["**"] + trace-segment.excludes = [] + akka-actor.includes = [] + akka-actor.excludes = ["**"] + akka-dispatcher.includes = [] + akka-dispatcher.excludes = ["**"] + } + } + + show-aspectj-missing-warning = no + + modules { + kamon-annotation { + auto-start = no + requires-aspectj = yes + } + + kamon-log-reporter { + auto-start = no + requires-aspectj = no + } + + kamon-statsd { + auto-start = no + requires-aspectj = no + } + } + + statsd { + # Hostname and port in which your StatsD is running. Remember that StatsD packets are sent using UDP and + # setting unreachable hosts and/or not open ports wont be warned by the Kamon, your data wont go anywhere. + hostname = "localhost" + port = 8125 + + # Interval between metrics data flushes to StatsD. It's value must be equal or greater than the + # kamon.metrics.tick-interval setting. + flush-interval = 10 second + + # Subscription patterns used to select which metrics will be pushed to StatsD. Note that first, metrics + # collection for your desired entities must be activated under the kamon.metrics.filters settings. + includes { + actor = [] + trace = ["*"] + trace-segment = ["*"] + dispatcher = [] + } + + subscriptions { + histogram = ["**"] + min-max-counter = ["**"] + gauge = ["**"] + counter = ["**"] + trace = ["**"] + trace-segment = ["**"] + akka-actor = [] + akka-dispatcher = [] + akka-router = [] + system-metric = ["**"] + http-server = [] + } + } +} diff --git a/alpha/pom.xml b/alpha/pom.xml index 64b4b89..738b956 100644 --- a/alpha/pom.xml +++ b/alpha/pom.xml @@ -35,6 +35,21 @@ <module>alpha-server</module> </modules> + <dependencies> + <dependency> + <groupId>io.kamon</groupId> + <artifactId>kamon-core_2.12</artifactId> + </dependency> + <dependency> + <groupId>io.kamon</groupId> + <artifactId>kamon-annotation_2.12</artifactId> + </dependency> + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-actor_2.12</artifactId> + </dependency> + </dependencies> + <build> <plugins> <!-- TODO need to clean up the states of AlphaServer --> @@ -47,4 +62,35 @@ </plugin> </plugins> </build> + + <profiles> + <profile> + <id>perf</id> + <dependencies> + <dependency> + <groupId>io.kamon</groupId> + <artifactId>kamon-log-reporter_2.12</artifactId> + </dependency> + <dependency> + <groupId>io.kamon</groupId> + <artifactId>kamon-statsd_2.12</artifactId> + </dependency> + <dependency> + <groupId>io.kamon</groupId> + <artifactId>kamon-autoweave_2.12</artifactId> + <version>0.6.5</version> + </dependency> + <dependency> + <groupId>org.aspectj</groupId> + <artifactId>aspectjweaver</artifactId> + </dependency> + <dependency> + <groupId>io.kamon</groupId> + <artifactId>kamon-system-metrics_2.12</artifactId> + <version>${kamon.version}</version> + </dependency> + </dependencies> + </profile> + </profiles> + </project> diff --git a/saga-spring/pom.xml b/saga-spring/pom.xml index b996e23..fc93b56 100755 --- a/saga-spring/pom.xml +++ b/saga-spring/pom.xml @@ -220,6 +220,7 @@ <dependency> <groupId>io.kamon</groupId> <artifactId>kamon-autoweave_2.12</artifactId> + <version>0.6.5</version> </dependency> <dependency> <groupId>org.aspectj</groupId>