This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push: new ecc0f94 Zipkin receiver in SkyWalking collector (#1273) ecc0f94 is described below commit ecc0f944d459aea437c800bdf50e4d28c4bbd846 Author: 吴晟 Wu Sheng <wu.sh...@foxmail.com> AuthorDate: Mon Jun 18 23:36:15 2018 +0800 Zipkin receiver in SkyWalking collector (#1273) * Add Zipkin receiver --- README.md | 13 +- README_ZH.md | 22 +- .../handler/InstanceDiscoveryServiceHandler.java | 26 +- .../handler/naming/AgentGRPCNamingHandler.java | 5 +- .../InstanceDiscoveryServiceHandlerTest.java | 47 +- .../handler/ApplicationRegisterServletHandler.java | 4 +- .../handler/InstanceDiscoveryServletHandler.java | 10 +- .../handler/InstanceHeartBeatServletHandler.java | 4 +- .../NetworkAddressRegisterServletHandler.java | 4 +- .../ServiceNameDiscoveryServiceHandler.java | 4 +- .../handler/TraceSegmentServletHandler.java | 4 +- .../handler/naming/AgentJettyNamingHandler.java | 4 +- .../register/define/service/AgentOsInfo.java | 84 ++++ .../define/service/IInstanceIDService.java | 2 +- .../provider/service/InstanceIDService.java | 5 +- .../parser/provider/parser/SegmentParse.java | 35 +- apm-collector/apm-collector-boot/pom.xml | 8 +- .../src/main/resources/application.yml | 8 +- .../apm/collector/server/jetty/JettyHandler.java | 157 +------ .../{JettyHandler.java => JettyJsonHandler.java} | 24 +- .../apm-collector-thirdparty-receiver/pom.xml | 37 ++ .../receiver-zipkin/docs/README.md | 25 ++ .../receiver-zipkin/pom.xml | 44 ++ .../receiver-zipkin/receiver-zipkin-define/pom.xml | 33 ++ .../zipkin/define/ZipkinReceiverModule.java} | 24 +- ...ywalking.apm.collector.core.module.ModuleDefine | 19 + .../receiver-zipkin-provider/pom.xml | 68 +++ .../zipkin/provider/Receiver2AnalysisBridge.java | 48 +++ .../receiver/zipkin/provider/RegisterServices.java | 78 ++++ .../zipkin/provider/ZipkinReceiverConfig.java} | 28 +- .../zipkin/provider/ZipkinReceiverProvider.java | 91 ++++ .../zipkin/provider/cache/CacheFactory.java} | 27 +- .../zipkin/provider/cache/ISpanCache.java} | 12 +- .../provider/cache/caffeine/CaffeineSpanCache.java | 99 +++++ .../zipkin/provider/data/SkyWalkingTrace.java | 58 +++ .../receiver/zipkin/provider/data/ZipkinTrace.java | 63 +++ .../zipkin/provider/handler/SpanProcessor.java | 54 +++ .../provider/handler/SpanV1JettyHandler.java | 71 +++ .../provider/handler/SpanV2JettyHandler.java | 73 ++++ .../zipkin/provider/transform/SegmentBuilder.java | 477 +++++++++++++++++++++ .../provider/transform/SegmentListener.java} | 13 +- .../transform/Zipkin2SkyWalkingTransfer.java | 67 +++ ...alking.apm.collector.core.module.ModuleProvider | 19 + .../transform/SpringSleuthSegmentBuilderTest.java | 196 +++++++++ .../collector/ui/jetty/handler/GraphQLHandler.java | 4 +- .../jetty/handler/naming/UIJettyNamingHandler.java | 4 +- apm-collector/pom.xml | 7 + apm-dist/release-docs/LICENSE | 1 + docs/README.md | 1 + docs/en/Incubating/Abstract.md | 1 + 50 files changed, 1913 insertions(+), 299 deletions(-) diff --git a/README.md b/README.md index 5dd896a..52de90d 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -Apache SkyWalking | [中文](README_ZH.md) +Apache SkyWalking ========== <img src="https://skywalkingtest.github.io/page-resources/3.0/skywalking.png" alt="Sky Walking logo" height="90px" align="right" /> @@ -40,6 +40,9 @@ including: - Modern and cool Web UI - Log integration - Alarm for slow or unstable(low SLA) application, instance and service +- [**Incubating**] Support accepting other tracer data formats. + - Zipkin JSON, Thrift, Protobuf v1 and v2 formats, powered by [OpenZipkin](https://github.com/openzipkin/zipkin) libs + - Jaeger in [Zipkin Thrift or JSON v1/v2 formats](https://github.com/jaegertracing/jaeger#backwards-compatibility-with-zipkin) # Document - [Documents in English](docs/README.md) @@ -53,7 +56,6 @@ This project adheres to the Contributor Covenant [code of conduct](CODE_OF_CONDU # Live Demo - Host in Beijing. [goto](http://49.4.12.44:8080/) -- Host in HK. [goto](http://159.138.0.181:8080/) # Screenshot <img src="https://skywalkingtest.github.io/page-resources/5.0.0-beta/Dashboard.png"/> @@ -61,11 +63,8 @@ This project adheres to the Contributor Covenant [code of conduct](CODE_OF_CONDU - [See all screenshots](/docs/Screenshots.md) -# Test reports -- Automatic integration test reports - - [Java Agent test report](https://github.com/SkywalkingTest/agent-integration-test-report) -- Performance test reports - - [Java Agent test report](https://skywalkingtest.github.io/Agent-Benchmarks/) +# Compiling project +Follow this [document](https://github.com/apache/incubator-skywalking/blob/master/docs/en/How-to-build.md). # Contact Us * Submit an issue diff --git a/README_ZH.md b/README_ZH.md index a61cd47..d93527f 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -38,6 +38,9 @@ Apache SkyWalking | [English](README.md) - 现代化Web UI - 日志集成 - 应用、实例和服务的告警 +- [**Incubating**]支持接口其他探针的数据 + - 接受Zipkin v1 v2格式数据,采用JSON, Thrift, Protobuf序列化方式。Powered by [OpenZipkin](https://github.com/openzipkin/zipkin) libs + - 接受Jaeger 使用 [Zipkin Thrift 或 JSON v1/v2 格式](https://github.com/jaegertracing/jaeger#backwards-compatibility-with-zipkin) # Document - [英文文档](docs/README.md) @@ -52,7 +55,6 @@ This project adheres to the Contributor Covenant [code of conduct](CODE_OF_CONDU # Live Demo - 北京服务器. [前往](http://49.4.12.44:8080/) -- 香港服务器. [前往](http://159.138.0.181:8080/) # Screenshot <img src="https://skywalkingtest.github.io/page-resources/5.0.0-beta/Dashboard.png"/> @@ -60,16 +62,8 @@ This project adheres to the Contributor Covenant [code of conduct](CODE_OF_CONDU - [查看所有系统截图](/docs/Screenshots.md) -# Test reports -- 自动化集成测试报告 - - [Java探针测试报告](https://github.com/SkywalkingTest/agent-integration-test-report) -- 性能测试报告 - - [Java探针测试报告](https://skywalkingtest.github.io/Agent-Benchmarks/) - -# Users -<img src="https://skywalkingtest.github.io/page-resources/users/users-2018-06-07.png"/> - -[报告新的用户案例](https://github.com/apache/incubator-skywalking/issues/443) +# Compiling project +查看[编译指南](https://github.com/apache/incubator-skywalking/blob/master/docs/cn/How-to-build-CN.md) # Contact Us * 直接提交Issue @@ -77,5 +71,11 @@ This project adheres to the Contributor Covenant [code of conduct](CODE_OF_CONDU * [Gitter](https://gitter.im/openskywalking/Lobby) * QQ群: 392443393 +# Users +<img src="https://skywalkingtest.github.io/page-resources/users/users-2018-06-07.png"/> + +[报告新的用户案例](https://github.com/apache/incubator-skywalking/issues/443) + + # License [Apache 2.0 License.](/LICENSE) diff --git a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/InstanceDiscoveryServiceHandler.java b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/InstanceDiscoveryServiceHandler.java index 7f1f9c9..9fec7e9 100644 --- a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/InstanceDiscoveryServiceHandler.java +++ b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/InstanceDiscoveryServiceHandler.java @@ -18,12 +18,11 @@ package org.apache.skywalking.apm.collector.agent.grpc.provider.handler; -import com.google.gson.JsonArray; -import com.google.gson.JsonObject; import io.grpc.stub.StreamObserver; import org.apache.skywalking.apm.collector.analysis.metric.define.AnalysisMetricModule; import org.apache.skywalking.apm.collector.analysis.metric.define.service.IInstanceHeartBeatService; import org.apache.skywalking.apm.collector.analysis.register.define.AnalysisRegisterModule; +import org.apache.skywalking.apm.collector.analysis.register.define.service.AgentOsInfo; import org.apache.skywalking.apm.collector.analysis.register.define.service.IInstanceIDService; import org.apache.skywalking.apm.collector.core.module.ModuleManager; import org.apache.skywalking.apm.collector.server.grpc.GRPCHandler; @@ -54,7 +53,14 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp @Override public void registerInstance(ApplicationInstance request, StreamObserver<ApplicationInstanceMapping> responseObserver) { - int instanceId = instanceIDService.getOrCreateByAgentUUID(request.getApplicationId(), request.getAgentUUID(), request.getRegisterTime(), buildOsInfo(request.getOsinfo())); + OSInfo osinfo = request.getOsinfo(); + AgentOsInfo agentOsInfo = new AgentOsInfo(); + agentOsInfo.setHostname(osinfo.getHostname()); + agentOsInfo.setOsName(osinfo.getOsName()); + agentOsInfo.setProcessNo(osinfo.getProcessNo()); + agentOsInfo.setIpv4s(osinfo.getIpv4SList()); + + int instanceId = instanceIDService.getOrCreateByAgentUUID(request.getApplicationId(), request.getAgentUUID(), request.getRegisterTime(), agentOsInfo); ApplicationInstanceMapping.Builder builder = ApplicationInstanceMapping.newBuilder(); builder.setApplicationId(request.getApplicationId()); builder.setApplicationInstanceId(instanceId); @@ -69,18 +75,4 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp responseObserver.onNext(Downstream.getDefaultInstance()); responseObserver.onCompleted(); } - - private String buildOsInfo(OSInfo osinfo) { - JsonObject osInfoJson = new JsonObject(); - osInfoJson.addProperty("osName", osinfo.getOsName()); - osInfoJson.addProperty("hostName", osinfo.getHostname()); - osInfoJson.addProperty("processId", osinfo.getProcessNo()); - - JsonArray ipv4Array = new JsonArray(); - for (String ipv4 : osinfo.getIpv4SList()) { - ipv4Array.add(ipv4); - } - osInfoJson.add("ipv4s", ipv4Array); - return osInfoJson.toString(); - } } diff --git a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/naming/AgentGRPCNamingHandler.java b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/naming/AgentGRPCNamingHandler.java index 3d5ce31..cf505d0 100644 --- a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/naming/AgentGRPCNamingHandler.java +++ b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/naming/AgentGRPCNamingHandler.java @@ -16,7 +16,6 @@ * */ - package org.apache.skywalking.apm.collector.agent.grpc.provider.handler.naming; import com.google.gson.JsonArray; @@ -24,12 +23,12 @@ import com.google.gson.JsonElement; import java.util.Set; import javax.servlet.http.HttpServletRequest; import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException; -import org.apache.skywalking.apm.collector.server.jetty.JettyHandler; +import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler; /** * @author peng-yongsheng */ -public class AgentGRPCNamingHandler extends JettyHandler { +public class AgentGRPCNamingHandler extends JettyJsonHandler { private final AgentGRPCNamingListener namingListener; diff --git a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/InstanceDiscoveryServiceHandlerTest.java b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/InstanceDiscoveryServiceHandlerTest.java index d3ab080..a7484c8 100644 --- a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/InstanceDiscoveryServiceHandlerTest.java +++ b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/InstanceDiscoveryServiceHandlerTest.java @@ -18,11 +18,16 @@ package org.apache.skywalking.apm.collector.agent.grpc.provider.handler; import io.grpc.stub.StreamObserver; +import java.util.UUID; import org.apache.skywalking.apm.collector.analysis.metric.define.service.IInstanceHeartBeatService; import org.apache.skywalking.apm.collector.analysis.register.define.service.IInstanceIDService; import org.apache.skywalking.apm.collector.core.module.MockModule; import org.apache.skywalking.apm.collector.core.module.ModuleManager; -import org.apache.skywalking.apm.network.proto.*; +import org.apache.skywalking.apm.network.proto.ApplicationInstance; +import org.apache.skywalking.apm.network.proto.ApplicationInstanceHeartbeat; +import org.apache.skywalking.apm.network.proto.ApplicationInstanceMapping; +import org.apache.skywalking.apm.network.proto.Downstream; +import org.apache.skywalking.apm.network.proto.OSInfo; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -31,11 +36,9 @@ import org.mockito.Mock; import org.mockito.internal.util.reflection.Whitebox; import org.mockito.runners.MockitoJUnitRunner; -import java.util.UUID; - -import static org.junit.Assert.*; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.powermock.api.mockito.PowerMockito.when; @@ -65,22 +68,22 @@ public class InstanceDiscoveryServiceHandlerTest { @Test public void registerInstance() { ApplicationInstance applicationInstance = ApplicationInstance.newBuilder() - .setAgentUUID(UUID.randomUUID().toString()) - .setApplicationId(10) - .setRegisterTime(System.currentTimeMillis()) - .setOsinfo( - OSInfo.newBuilder() - .setOsName("MAC OS") - .setHostname("test") - .addIpv4S("127.0.0.1") - .setProcessNo(123456) - .build() - ).build(); - when(instanceIDService.getOrCreateByAgentUUID(anyInt(), anyString(), anyLong(), anyString())).thenReturn(100); + .setAgentUUID(UUID.randomUUID().toString()) + .setApplicationId(10) + .setRegisterTime(System.currentTimeMillis()) + .setOsinfo( + OSInfo.newBuilder() + .setOsName("MAC OS") + .setHostname("test") + .addIpv4S("127.0.0.1") + .setProcessNo(123456) + .build() + ).build(); + when(instanceIDService.getOrCreateByAgentUUID(anyInt(), anyString(), anyLong(), anyObject())).thenReturn(100); instanceDiscoveryServiceHandler.registerInstance(applicationInstance, new StreamObserver<ApplicationInstanceMapping>() { @Override public void onNext(ApplicationInstanceMapping applicationInstanceMapping) { - Assert.assertEquals(100,applicationInstanceMapping.getApplicationInstanceId()); + Assert.assertEquals(100, applicationInstanceMapping.getApplicationInstanceId()); } @Override @@ -98,13 +101,13 @@ public class InstanceDiscoveryServiceHandlerTest { @Test public void heartbeat() { ApplicationInstanceHeartbeat heartbeat = ApplicationInstanceHeartbeat.newBuilder() - .setApplicationInstanceId(100) - .setHeartbeatTime(System.currentTimeMillis()) - .build(); + .setApplicationInstanceId(100) + .setHeartbeatTime(System.currentTimeMillis()) + .build(); instanceDiscoveryServiceHandler.heartbeat(heartbeat, new StreamObserver<Downstream>() { @Override public void onNext(Downstream downstream) { - Assert.assertEquals(Downstream.getDefaultInstance(),downstream); + Assert.assertEquals(Downstream.getDefaultInstance(), downstream); } @Override @@ -118,4 +121,4 @@ public class InstanceDiscoveryServiceHandlerTest { } }); } -} \ No newline at end of file +} diff --git a/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/ApplicationRegisterServletHandler.java b/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/ApplicationRegisterServletHandler.java index c60f4b6..100692d 100644 --- a/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/ApplicationRegisterServletHandler.java +++ b/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/ApplicationRegisterServletHandler.java @@ -28,14 +28,14 @@ import org.apache.skywalking.apm.collector.analysis.register.define.AnalysisRegi import org.apache.skywalking.apm.collector.analysis.register.define.service.IApplicationIDService; import org.apache.skywalking.apm.collector.core.module.ModuleManager; import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException; -import org.apache.skywalking.apm.collector.server.jetty.JettyHandler; +import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ -public class ApplicationRegisterServletHandler extends JettyHandler { +public class ApplicationRegisterServletHandler extends JettyJsonHandler { private static final Logger logger = LoggerFactory.getLogger(ApplicationRegisterServletHandler.class); diff --git a/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/InstanceDiscoveryServletHandler.java b/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/InstanceDiscoveryServletHandler.java index d88d86c..d0a1a08 100644 --- a/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/InstanceDiscoveryServletHandler.java +++ b/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/InstanceDiscoveryServletHandler.java @@ -24,17 +24,18 @@ import com.google.gson.JsonObject; import java.io.IOException; import javax.servlet.http.HttpServletRequest; import org.apache.skywalking.apm.collector.analysis.register.define.AnalysisRegisterModule; +import org.apache.skywalking.apm.collector.analysis.register.define.service.AgentOsInfo; import org.apache.skywalking.apm.collector.analysis.register.define.service.IInstanceIDService; import org.apache.skywalking.apm.collector.core.module.ModuleManager; import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException; -import org.apache.skywalking.apm.collector.server.jetty.JettyHandler; +import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ -public class InstanceDiscoveryServletHandler extends JettyHandler { +public class InstanceDiscoveryServletHandler extends JettyJsonHandler { private static final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServletHandler.class); @@ -66,9 +67,10 @@ public class InstanceDiscoveryServletHandler extends JettyHandler { int applicationId = instance.get(APPLICATION_ID).getAsInt(); String agentUUID = instance.get(AGENT_UUID).getAsString(); long registerTime = instance.get(REGISTER_TIME).getAsLong(); - JsonObject osInfo = instance.get(OS_INFO).getAsJsonObject(); + JsonObject osInfoJson = instance.get(OS_INFO).getAsJsonObject(); + AgentOsInfo osInfo = gson.fromJson(osInfoJson, AgentOsInfo.class); - int instanceId = instanceIDService.getOrCreateByAgentUUID(applicationId, agentUUID, registerTime, osInfo.toString()); + int instanceId = instanceIDService.getOrCreateByAgentUUID(applicationId, agentUUID, registerTime, osInfo); responseJson.addProperty(APPLICATION_ID, applicationId); responseJson.addProperty(INSTANCE_ID, instanceId); } catch (IOException e) { diff --git a/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/InstanceHeartBeatServletHandler.java b/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/InstanceHeartBeatServletHandler.java index acc479a..32841b9 100644 --- a/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/InstanceHeartBeatServletHandler.java +++ b/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/InstanceHeartBeatServletHandler.java @@ -27,14 +27,14 @@ import org.apache.skywalking.apm.collector.analysis.metric.define.AnalysisMetric import org.apache.skywalking.apm.collector.analysis.metric.define.service.IInstanceHeartBeatService; import org.apache.skywalking.apm.collector.core.module.ModuleManager; import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException; -import org.apache.skywalking.apm.collector.server.jetty.JettyHandler; +import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ -public class InstanceHeartBeatServletHandler extends JettyHandler { +public class InstanceHeartBeatServletHandler extends JettyJsonHandler { private static final Logger logger = LoggerFactory.getLogger(InstanceHeartBeatServletHandler.class); diff --git a/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/NetworkAddressRegisterServletHandler.java b/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/NetworkAddressRegisterServletHandler.java index 318c3fe..4fc8852 100644 --- a/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/NetworkAddressRegisterServletHandler.java +++ b/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/NetworkAddressRegisterServletHandler.java @@ -28,14 +28,14 @@ import org.apache.skywalking.apm.collector.analysis.register.define.AnalysisRegi import org.apache.skywalking.apm.collector.analysis.register.define.service.INetworkAddressIDService; import org.apache.skywalking.apm.collector.core.module.ModuleManager; import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException; -import org.apache.skywalking.apm.collector.server.jetty.JettyHandler; +import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ -public class NetworkAddressRegisterServletHandler extends JettyHandler { +public class NetworkAddressRegisterServletHandler extends JettyJsonHandler { private static final Logger logger = LoggerFactory.getLogger(NetworkAddressRegisterServletHandler.class); diff --git a/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/ServiceNameDiscoveryServiceHandler.java b/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/ServiceNameDiscoveryServiceHandler.java index 6ddce79..4bc4ac7 100644 --- a/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/ServiceNameDiscoveryServiceHandler.java +++ b/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/ServiceNameDiscoveryServiceHandler.java @@ -28,14 +28,14 @@ import org.apache.skywalking.apm.collector.analysis.register.define.AnalysisRegi import org.apache.skywalking.apm.collector.analysis.register.define.service.IServiceNameService; import org.apache.skywalking.apm.collector.core.module.ModuleManager; import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException; -import org.apache.skywalking.apm.collector.server.jetty.JettyHandler; +import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ -public class ServiceNameDiscoveryServiceHandler extends JettyHandler { +public class ServiceNameDiscoveryServiceHandler extends JettyJsonHandler { private static final Logger logger = LoggerFactory.getLogger(ServiceNameDiscoveryServiceHandler.class); diff --git a/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/TraceSegmentServletHandler.java b/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/TraceSegmentServletHandler.java index fec68ef..169329b 100644 --- a/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/TraceSegmentServletHandler.java +++ b/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/TraceSegmentServletHandler.java @@ -29,14 +29,14 @@ import org.apache.skywalking.apm.collector.analysis.segment.parser.define.Analys import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParseService; import org.apache.skywalking.apm.collector.core.module.ModuleManager; import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException; -import org.apache.skywalking.apm.collector.server.jetty.JettyHandler; +import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ -public class TraceSegmentServletHandler extends JettyHandler { +public class TraceSegmentServletHandler extends JettyJsonHandler { private static final Logger logger = LoggerFactory.getLogger(TraceSegmentServletHandler.class); diff --git a/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/naming/AgentJettyNamingHandler.java b/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/naming/AgentJettyNamingHandler.java index cf63350..f9114c6 100644 --- a/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/naming/AgentJettyNamingHandler.java +++ b/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/naming/AgentJettyNamingHandler.java @@ -24,12 +24,12 @@ import com.google.gson.JsonElement; import java.util.Set; import javax.servlet.http.HttpServletRequest; import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException; -import org.apache.skywalking.apm.collector.server.jetty.JettyHandler; +import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler; /** * @author peng-yongsheng */ -public class AgentJettyNamingHandler extends JettyHandler { +public class AgentJettyNamingHandler extends JettyJsonHandler { private final AgentJettyNamingListener namingListener; diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-define/src/main/java/org/apache/skywalking/apm/collector/analysis/register/define/service/AgentOsInfo.java b/apm-collector/apm-collector-analysis/analysis-register/register-define/src/main/java/org/apache/skywalking/apm/collector/analysis/register/define/service/AgentOsInfo.java new file mode 100644 index 0000000..b16f843 --- /dev/null +++ b/apm-collector/apm-collector-analysis/analysis-register/register-define/src/main/java/org/apache/skywalking/apm/collector/analysis/register/define/service/AgentOsInfo.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.analysis.register.define.service; + +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import java.util.List; + +/** + * @author wusheng + */ +public class AgentOsInfo { + private String osName; + private String hostname; + private int processNo; + private List<String> ipv4s; + + public AgentOsInfo() { + } + + public String getOsName() { + return osName; + } + + public void setOsName(String osName) { + this.osName = osName; + } + + public String getHostname() { + return hostname; + } + + public void setHostname(String hostname) { + this.hostname = hostname; + } + + public int getProcessNo() { + return processNo; + } + + public void setProcessNo(int processNo) { + this.processNo = processNo; + } + + public List<String> getIpv4s() { + return ipv4s; + } + + public void setIpv4s(List<String> ipv4s) { + this.ipv4s = ipv4s; + } + + @Override public String toString() { + JsonObject osInfoJson = new JsonObject(); + osInfoJson.addProperty("osName", this.getOsName()); + osInfoJson.addProperty("hostName", this.getHostname()); + osInfoJson.addProperty("processId", this.getProcessNo()); + + JsonArray ipv4Array = new JsonArray(); + if (this.getIpv4s() != null) { + for (String ipv4 : this.getIpv4s()) { + ipv4Array.add(ipv4); + } + } + osInfoJson.add("ipv4s", ipv4Array); + return osInfoJson.toString(); + } +} diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-define/src/main/java/org/apache/skywalking/apm/collector/analysis/register/define/service/IInstanceIDService.java b/apm-collector/apm-collector-analysis/analysis-register/register-define/src/main/java/org/apache/skywalking/apm/collector/analysis/register/define/service/IInstanceIDService.java index ed84f05..8959b73 100644 --- a/apm-collector/apm-collector-analysis/analysis-register/register-define/src/main/java/org/apache/skywalking/apm/collector/analysis/register/define/service/IInstanceIDService.java +++ b/apm-collector/apm-collector-analysis/analysis-register/register-define/src/main/java/org/apache/skywalking/apm/collector/analysis/register/define/service/IInstanceIDService.java @@ -24,7 +24,7 @@ import org.apache.skywalking.apm.collector.core.module.Service; * @author peng-yongsheng */ public interface IInstanceIDService extends Service { - int getOrCreateByAgentUUID(int applicationId, String agentUUID, long registerTime, String osInfo); + int getOrCreateByAgentUUID(int applicationId, String agentUUID, long registerTime, AgentOsInfo osInfo); int getOrCreateByAddressId(int applicationId, int addressId, long registerTime); } diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/service/InstanceIDService.java b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/service/InstanceIDService.java index 293bad2..b4cbf98 100644 --- a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/service/InstanceIDService.java +++ b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/service/InstanceIDService.java @@ -19,6 +19,7 @@ package org.apache.skywalking.apm.collector.analysis.register.provider.service; import org.apache.skywalking.apm.collector.analysis.register.define.graph.GraphIdDefine; +import org.apache.skywalking.apm.collector.analysis.register.define.service.AgentOsInfo; import org.apache.skywalking.apm.collector.analysis.register.define.service.IInstanceIDService; import org.apache.skywalking.apm.collector.cache.CacheModule; import org.apache.skywalking.apm.collector.cache.service.ApplicationCacheService; @@ -71,7 +72,7 @@ public class InstanceIDService implements IInstanceIDService { return applicationCacheService; } - @Override public int getOrCreateByAgentUUID(int applicationId, String agentUUID, long registerTime, String osInfo) { + @Override public int getOrCreateByAgentUUID(int applicationId, String agentUUID, long registerTime, AgentOsInfo osInfo) { logger.debug("get or getOrCreate instance id by agent UUID, application id: {}, agentUUID: {}, registerTime: {}, osInfo: {}", applicationId, agentUUID, registerTime, osInfo); int instanceId = getInstanceCacheService().getInstanceIdByAgentUUID(applicationId, agentUUID); @@ -84,7 +85,7 @@ public class InstanceIDService implements IInstanceIDService { instance.setRegisterTime(registerTime); instance.setHeartBeatTime(registerTime); instance.setInstanceId(0); - instance.setOsInfo(osInfo); + instance.setOsInfo(osInfo.toString()); instance.setIsAddress(BooleanUtils.FALSE); instance.setAddressId(Const.NONE); diff --git a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentParse.java b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentParse.java index 87b1e39..aaa0f83 100644 --- a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentParse.java +++ b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentParse.java @@ -19,20 +19,32 @@ package org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser; import com.google.protobuf.InvalidProtocolBufferException; -import java.util.*; -import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.*; +import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.ReferenceDecorator; +import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SegmentCoreInfo; +import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SegmentDecorator; +import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SpanDecorator; import org.apache.skywalking.apm.collector.analysis.segment.parser.define.graph.GraphIdDefine; import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.*; import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParseService; -import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization.*; +import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization.ReferenceIdExchanger; +import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization.SegmentStandardization; +import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization.SpanIdExchanger; import org.apache.skywalking.apm.collector.core.UnexpectedException; import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric; -import org.apache.skywalking.apm.collector.core.graph.*; +import org.apache.skywalking.apm.collector.core.graph.Graph; +import org.apache.skywalking.apm.collector.core.graph.GraphManager; import org.apache.skywalking.apm.collector.core.module.ModuleManager; import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils; import org.apache.skywalking.apm.collector.storage.table.segment.Segment; -import org.apache.skywalking.apm.network.proto.*; -import org.slf4j.*; +import org.apache.skywalking.apm.network.proto.SpanType; +import org.apache.skywalking.apm.network.proto.TraceSegmentObject; +import org.apache.skywalking.apm.network.proto.UniqueId; +import org.apache.skywalking.apm.network.proto.UpstreamSegment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; /** * @author peng-yongsheng @@ -193,7 +205,7 @@ public class SegmentParse { private void notifyExitListener(SpanDecorator spanDecorator) { spanListeners.forEach(listener -> { if (listener.containsPoint(SpanListener.Point.Exit)) { - ((ExitSpanListener)listener).parseExit(spanDecorator, segmentCoreInfo); + ((ExitSpanListener) listener).parseExit(spanDecorator, segmentCoreInfo); } }); } @@ -202,7 +214,7 @@ public class SegmentParse { private void notifyEntryListener(SpanDecorator spanDecorator) { spanListeners.forEach(listener -> { if (listener.containsPoint(SpanListener.Point.Entry)) { - ((EntrySpanListener)listener).parseEntry(spanDecorator, segmentCoreInfo); + ((EntrySpanListener) listener).parseEntry(spanDecorator, segmentCoreInfo); } }); } @@ -211,7 +223,7 @@ public class SegmentParse { private void notifyLocalListener(SpanDecorator spanDecorator) { spanListeners.forEach(listener -> { if (listener.containsPoint(SpanListener.Point.Local)) { - ((LocalSpanListener)listener).parseLocal(spanDecorator, segmentCoreInfo); + ((LocalSpanListener) listener).parseLocal(spanDecorator, segmentCoreInfo); } }); } @@ -220,7 +232,7 @@ public class SegmentParse { private void notifyFirstListener(SpanDecorator spanDecorator) { spanListeners.forEach(listener -> { if (listener.containsPoint(SpanListener.Point.First)) { - ((FirstSpanListener)listener).parseFirst(spanDecorator, segmentCoreInfo); + ((FirstSpanListener) listener).parseFirst(spanDecorator, segmentCoreInfo); } }); } @@ -229,7 +241,7 @@ public class SegmentParse { private void notifyGlobalsListener(UniqueId uniqueId) { spanListeners.forEach(listener -> { if (listener.containsPoint(SpanListener.Point.GlobalTraceIds)) { - ((GlobalTraceIdsListener)listener).parseGlobalTraceId(uniqueId, segmentCoreInfo); + ((GlobalTraceIdsListener) listener).parseGlobalTraceId(uniqueId, segmentCoreInfo); } }); } @@ -238,4 +250,5 @@ public class SegmentParse { private void createSpanListeners() { listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager))); } + } diff --git a/apm-collector/apm-collector-boot/pom.xml b/apm-collector/apm-collector-boot/pom.xml index 22981fc..6a06ebc 100644 --- a/apm-collector/apm-collector-boot/pom.xml +++ b/apm-collector/apm-collector-boot/pom.xml @@ -185,6 +185,12 @@ <version>${project.version}</version> </dependency> <!-- alarm provider --> + <!-- zipkin receiver--> + <dependency> + <groupId>org.apache.skywalking</groupId> + <artifactId>receiver-zipkin-provider</artifactId> + <version>${project.version}</version> + </dependency> <!-- instrument provided dependency --> <dependency> @@ -246,4 +252,4 @@ </plugin> </plugins> </build> -</project> \ No newline at end of file +</project> diff --git a/apm-collector/apm-collector-boot/src/main/resources/application.yml b/apm-collector/apm-collector-boot/src/main/resources/application.yml index 985ddbb..05f9e61 100644 --- a/apm-collector/apm-collector-boot/src/main/resources/application.yml +++ b/apm-collector/apm-collector-boot/src/main/resources/application.yml @@ -101,4 +101,10 @@ configuration: thermodynamicResponseTimeStep: 50 thermodynamicCountOfResponseTimeSteps: 40 # max collection's size of worker cache collection, setting it smaller when collector OutOfMemory crashed. - workerCacheMaxSize: 10000 \ No newline at end of file + workerCacheMaxSize: 10000 +#receiver_zipkin: +# default: +# host: localhost +# port: 9411 +# contextPath: / +# \ No newline at end of file diff --git a/apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/jetty/JettyHandler.java b/apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/jetty/JettyHandler.java index 1f4e66e..6738ad4 100644 --- a/apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/jetty/JettyHandler.java +++ b/apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/jetty/JettyHandler.java @@ -18,167 +18,12 @@ package org.apache.skywalking.apm.collector.server.jetty; -import com.google.gson.JsonElement; -import org.apache.skywalking.apm.collector.server.ServerHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.servlet.*; import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.Enumeration; - -import static java.util.Objects.nonNull; +import org.apache.skywalking.apm.collector.server.ServerHandler; /** * @author peng-yongsheng */ public abstract class JettyHandler extends HttpServlet implements ServerHandler { - - private static final Logger logger = LoggerFactory.getLogger(JettyHandler.class); - public abstract String pathSpec(); - - @Override - protected final void doGet(HttpServletRequest req, HttpServletResponse resp) { - try { - reply(resp, doGet(req)); - } catch (ArgumentsParseException | IOException e) { - try { - replyError(resp, e.getMessage(), HttpServletResponse.SC_BAD_REQUEST); - } catch (IOException replyException) { - logger.error(replyException.getMessage(), e); - } - } - } - - protected abstract JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException; - - @Override - protected final void doPost(HttpServletRequest req, HttpServletResponse resp) { - try { - reply(resp, doPost(req)); - } catch (ArgumentsParseException | IOException e) { - try { - replyError(resp, e.getMessage(), HttpServletResponse.SC_BAD_REQUEST); - } catch (IOException replyException) { - logger.error(replyException.getMessage(), e); - } - } - } - - protected abstract JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException, IOException; - - @Override - protected final void doHead(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { - super.doHead(req, resp); - } - - @Override protected final long getLastModified(HttpServletRequest req) { - return super.getLastModified(req); - } - - @Override - protected final void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { - super.doPut(req, resp); - } - - @Override - protected final void doDelete(HttpServletRequest req, - HttpServletResponse resp) throws ServletException, IOException { - super.doDelete(req, resp); - } - - @Override - protected final void doOptions(HttpServletRequest req, - HttpServletResponse resp) throws ServletException, IOException { - super.doOptions(req, resp); - } - - @Override - protected final void doTrace(HttpServletRequest req, - HttpServletResponse resp) throws ServletException, IOException { - super.doTrace(req, resp); - } - - @Override - protected final void service(HttpServletRequest req, - HttpServletResponse resp) throws ServletException, IOException { - super.service(req, resp); - } - - @Override public final void service(ServletRequest req, ServletResponse res) throws ServletException, IOException { - super.service(req, res); - } - - @Override public final void destroy() { - super.destroy(); - } - - @Override public final String getInitParameter(String name) { - return super.getInitParameter(name); - } - - @Override public final Enumeration<String> getInitParameterNames() { - return super.getInitParameterNames(); - } - - @Override public final ServletConfig getServletConfig() { - return super.getServletConfig(); - } - - @Override public final ServletContext getServletContext() { - return super.getServletContext(); - } - - @Override public final String getServletInfo() { - return super.getServletInfo(); - } - - @Override public final void init(ServletConfig config) throws ServletException { - super.init(config); - } - - @Override public final void init() throws ServletException { - super.init(); - } - - @Override public final void log(String msg) { - super.log(msg); - } - - @Override public final void log(String message, Throwable t) { - super.log(message, t); - } - - @Override public final String getServletName() { - return super.getServletName(); - } - - private void reply(HttpServletResponse response, JsonElement resJson) throws IOException { - response.setContentType("application/json"); - response.setCharacterEncoding("utf-8"); - response.setStatus(HttpServletResponse.SC_OK); - - PrintWriter out = response.getWriter(); - if (nonNull(resJson)) { - out.print(resJson); - } - out.flush(); - out.close(); - } - - private void replyError(HttpServletResponse response, String errorMessage, int status) throws IOException { - response.setContentType("application/json"); - response.setCharacterEncoding("utf-8"); - response.setStatus(status); - response.setHeader("error-message", errorMessage); - - PrintWriter out = response.getWriter(); - out.flush(); - out.close(); - } } diff --git a/apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/jetty/JettyHandler.java b/apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/jetty/JettyJsonHandler.java similarity index 95% copy from apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/jetty/JettyHandler.java copy to apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/jetty/JettyJsonHandler.java index 1f4e66e..f79f8c7 100644 --- a/apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/jetty/JettyHandler.java +++ b/apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/jetty/JettyJsonHandler.java @@ -19,29 +19,27 @@ package org.apache.skywalking.apm.collector.server.jetty; import com.google.gson.JsonElement; -import org.apache.skywalking.apm.collector.server.ServerHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.servlet.*; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.PrintWriter; import java.util.Enumeration; +import javax.servlet.ServletConfig; +import javax.servlet.ServletContext; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static java.util.Objects.nonNull; /** - * @author peng-yongsheng + * @author wusheng */ -public abstract class JettyHandler extends HttpServlet implements ServerHandler { - +public abstract class JettyJsonHandler extends JettyHandler { private static final Logger logger = LoggerFactory.getLogger(JettyHandler.class); - public abstract String pathSpec(); - @Override protected final void doGet(HttpServletRequest req, HttpServletResponse resp) { try { diff --git a/apm-collector/apm-collector-thirdparty-receiver/pom.xml b/apm-collector/apm-collector-thirdparty-receiver/pom.xml new file mode 100644 index 0000000..4626467 --- /dev/null +++ b/apm-collector/apm-collector-thirdparty-receiver/pom.xml @@ -0,0 +1,37 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>apm-collector</artifactId> + <groupId>org.apache.skywalking</groupId> + <version>5.0.0-beta2-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>apm-collector-thirdparty-receiver</artifactId> + <packaging>pom</packaging> + <modules> + <module>receiver-zipkin</module> + </modules> + + +</project> diff --git a/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/docs/README.md b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/docs/README.md new file mode 100644 index 0000000..a1c5c6c --- /dev/null +++ b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/docs/README.md @@ -0,0 +1,25 @@ +# Zipkin receiver +[Zipkin](http://zipkin.io/) receiver provides the feature to receive span data in Zipkin formats. SkyWalking backend provides +analysis, aggregation and visualization. So the user will not need to learn how SkyWalking auto instrumentation +agents(Java, .NET, node.js) work, or they don't want to change for some reasons, such as Zipkin integration has been completed. + +Zipkin receiver is only an optional features in SkyWalking, even now it is [an incubating feature](../../../../docs/en/Incubating/Abstract.md). + +## Limits +As an incubating feature, it is a prototype. So it has following limits: + +1. Don't try to use SkyWalking native agents and Zipkin's libs in the same distributed system. Considering HEADERs of Zipkin and SkyWalking aren't shared/interoperable, their two will not propagate context for each other. Trace will not continue. +1. Don't support cluster mode. +1. Analysis based on trace will be finished in the certain and given duration. The default assumption is 2 min most. SkyWalking used more complex header and context to avoid this in analysis stage. + +## Open Zipkin receiver +Zipkin receiver is an optional module, and default closed. For open it, add these settings in your `application.yml` in collector +```yaml +receiver_zipkin: + default: + host: localhost + port: 9411 + contextPath: / + expireTime: 20 # Unit is seconds + maxCacheSize: 1000000 # The number of traces in buffer +``` diff --git a/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/pom.xml b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/pom.xml new file mode 100644 index 0000000..4ed7017 --- /dev/null +++ b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/pom.xml @@ -0,0 +1,44 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>apm-collector-thirdparty-receiver</artifactId> + <groupId>org.apache.skywalking</groupId> + <version>5.0.0-beta2-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>receiver-zipkin</artifactId> + <packaging>pom</packaging> + <modules> + <module>receiver-zipkin-define</module> + <module>receiver-zipkin-provider</module> + </modules> + + <dependencies> + <dependency> + <groupId>org.apache.skywalking</groupId> + <artifactId>apm-collector-core</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> +</project> diff --git a/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-define/pom.xml b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-define/pom.xml new file mode 100644 index 0000000..e7d1282 --- /dev/null +++ b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-define/pom.xml @@ -0,0 +1,33 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>receiver-zipkin</artifactId> + <groupId>org.apache.skywalking</groupId> + <version>5.0.0-beta2-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>receiver-zipkin-define</artifactId> + + +</project> diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-define/src/main/java/org/apache/skywalking/apm/collector/analysis/register/define/service/IInstanceIDService.java b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-define/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/define/ZipkinReceiverModule.java similarity index 51% copy from apm-collector/apm-collector-analysis/analysis-register/register-define/src/main/java/org/apache/skywalking/apm/collector/analysis/register/define/service/IInstanceIDService.java copy to apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-define/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/define/ZipkinReceiverModule.java index ed84f05..c93c192 100644 --- a/apm-collector/apm-collector-analysis/analysis-register/register-define/src/main/java/org/apache/skywalking/apm/collector/analysis/register/define/service/IInstanceIDService.java +++ b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-define/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/define/ZipkinReceiverModule.java @@ -16,15 +16,27 @@ * */ -package org.apache.skywalking.apm.collector.analysis.register.define.service; +package org.apache.skywalking.apm.collector.receiver.zipkin.define; -import org.apache.skywalking.apm.collector.core.module.Service; +import org.apache.skywalking.apm.collector.core.module.ModuleDefine; /** - * @author peng-yongsheng + * Zipkin receiver module provides the HTTP, protoc serve for any SDK or agent by following Zipkin format. + * + * At this moment, Zipkin format is not compatible with SkyWalking, especially HEADERs. + * Please don't consider this as a Zipkin-SkyWalking integration, + * it is provided for adding analysis, aggregation and visualization capabilities to zipkin backend. + * + * @author wusheng */ -public interface IInstanceIDService extends Service { - int getOrCreateByAgentUUID(int applicationId, String agentUUID, long registerTime, String osInfo); +public class ZipkinReceiverModule extends ModuleDefine { + public static final String NAME = "receiver_zipkin"; + + @Override public String name() { + return NAME; + } - int getOrCreateByAddressId(int applicationId, int addressId, long registerTime); + @Override public Class[] services() { + return new Class[0]; + } } diff --git a/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-define/src/main/resources/META-INF/services/org.apache.skywalking.apm.collector.core.module.ModuleDefine b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-define/src/main/resources/META-INF/services/org.apache.skywalking.apm.collector.core.module.ModuleDefine new file mode 100644 index 0000000..c8608bc --- /dev/null +++ b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-define/src/main/resources/META-INF/services/org.apache.skywalking.apm.collector.core.module.ModuleDefine @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +org.apache.skywalking.apm.collector.receiver.zipkin.define.ZipkinReceiverModule diff --git a/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/pom.xml b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/pom.xml new file mode 100644 index 0000000..816a14b --- /dev/null +++ b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/pom.xml @@ -0,0 +1,68 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>receiver-zipkin</artifactId> + <groupId>org.apache.skywalking</groupId> + <version>5.0.0-beta2-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>receiver-zipkin-provider</artifactId> + <dependencies> + <dependency> + <groupId>org.apache.skywalking</groupId> + <artifactId>receiver-zipkin-define</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.skywalking</groupId> + <artifactId>collector-jetty-manager-define</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.skywalking</groupId> + <artifactId>segment-parser-define</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.skywalking</groupId> + <artifactId>register-define</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.skywalking</groupId> + <artifactId>metric-define</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + </dependency> + <dependency> + <groupId>io.zipkin.zipkin2</groupId> + <artifactId>zipkin</artifactId> + </dependency> + </dependencies> + + +</project> diff --git a/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/Receiver2AnalysisBridge.java b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/Receiver2AnalysisBridge.java new file mode 100644 index 0000000..5ef1036 --- /dev/null +++ b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/Receiver2AnalysisBridge.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.receiver.zipkin.provider; + +import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParseService; +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.data.SkyWalkingTrace; +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.transform.SegmentListener; +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.transform.Zipkin2SkyWalkingTransfer; + +/** + * Send the segments to Analysis module, like receiving segments from native SkyWalking agents. + */ +public class Receiver2AnalysisBridge implements SegmentListener { + private ISegmentParseService segmentParseService; + + public Receiver2AnalysisBridge(ISegmentParseService segmentParseService) { + this.segmentParseService = segmentParseService; + } + + /** + * Add this bridge as listener to Zipkin span transfer. + */ + public void build() { + Zipkin2SkyWalkingTransfer.INSTANCE.addListener(this); + } + + @Override + public void notify(SkyWalkingTrace trace) { + trace.toUpstreamSegment().forEach(upstream -> segmentParseService.parse(upstream.build(), ISegmentParseService.Source.Agent)); + + } +} diff --git a/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/RegisterServices.java b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/RegisterServices.java new file mode 100644 index 0000000..0d3fcc8 --- /dev/null +++ b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/RegisterServices.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.receiver.zipkin.provider; + +import org.apache.skywalking.apm.collector.analysis.register.define.service.AgentOsInfo; +import org.apache.skywalking.apm.collector.analysis.register.define.service.IApplicationIDService; +import org.apache.skywalking.apm.collector.analysis.register.define.service.IInstanceIDService; +import org.apache.skywalking.apm.collector.analysis.register.define.service.INetworkAddressIDService; +import org.apache.skywalking.apm.collector.analysis.register.define.service.IServiceNameService; + +/** + * @author wusheng + */ +public class RegisterServices { + private IApplicationIDService applicationIDService; + + private IInstanceIDService instanceIDService; + + private INetworkAddressIDService networkAddressIDService; + + private IServiceNameService serviceNameService; + + public RegisterServices( + IApplicationIDService applicationIDService, + IInstanceIDService instanceIDService, + INetworkAddressIDService networkAddressIDService, + IServiceNameService serviceNameService) { + this.applicationIDService = applicationIDService; + this.instanceIDService = instanceIDService; + this.networkAddressIDService = networkAddressIDService; + this.serviceNameService = serviceNameService; + } + + public IApplicationIDService getApplicationIDService() { + return applicationIDService; + } + + public IInstanceIDService getInstanceIDService() { + return instanceIDService; + } + + public INetworkAddressIDService getNetworkAddressIDService() { + return networkAddressIDService; + } + + public IServiceNameService getServiceNameService() { + return serviceNameService; + } + + /** + * @param applicationId + * @param agentUUID in zipkin translation, always means application code. Because no UUID for each process. + * @return + */ + public int getOrCreateApplicationInstanceId(int applicationId, String agentUUID) { + AgentOsInfo agentOsInfo = new AgentOsInfo(); + agentOsInfo.setHostname("N/A"); + agentOsInfo.setOsName("N/A"); + agentOsInfo.setProcessNo(-1); + return getInstanceIDService().getOrCreateByAgentUUID(applicationId, agentUUID, System.currentTimeMillis(), agentOsInfo); + } +} diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-define/src/main/java/org/apache/skywalking/apm/collector/analysis/register/define/service/IInstanceIDService.java b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/ZipkinReceiverConfig.java similarity index 56% copy from apm-collector/apm-collector-analysis/analysis-register/register-define/src/main/java/org/apache/skywalking/apm/collector/analysis/register/define/service/IInstanceIDService.java copy to apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/ZipkinReceiverConfig.java index ed84f05..6e261e9 100644 --- a/apm-collector/apm-collector-analysis/analysis-register/register-define/src/main/java/org/apache/skywalking/apm/collector/analysis/register/define/service/IInstanceIDService.java +++ b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/ZipkinReceiverConfig.java @@ -16,15 +16,31 @@ * */ -package org.apache.skywalking.apm.collector.analysis.register.define.service; +package org.apache.skywalking.apm.collector.receiver.zipkin.provider; -import org.apache.skywalking.apm.collector.core.module.Service; +import org.apache.skywalking.apm.collector.server.jetty.JettyServerConfig; /** - * @author peng-yongsheng + * @author wusheng */ -public interface IInstanceIDService extends Service { - int getOrCreateByAgentUUID(int applicationId, String agentUUID, long registerTime, String osInfo); +public class ZipkinReceiverConfig extends JettyServerConfig { + private int expireTime = 20; - int getOrCreateByAddressId(int applicationId, int addressId, long registerTime); + private int maxCacheSize = 1_000_000; + + public int getExpireTime() { + return expireTime; + } + + public void setExpireTime(int expireTime) { + this.expireTime = expireTime; + } + + public int getMaxCacheSize() { + return maxCacheSize; + } + + public void setMaxCacheSize(int maxCacheSize) { + this.maxCacheSize = maxCacheSize; + } } diff --git a/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/ZipkinReceiverProvider.java b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/ZipkinReceiverProvider.java new file mode 100644 index 0000000..fd364f4 --- /dev/null +++ b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/ZipkinReceiverProvider.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.receiver.zipkin.provider; + +import org.apache.skywalking.apm.collector.analysis.metric.define.AnalysisMetricModule; +import org.apache.skywalking.apm.collector.analysis.metric.define.service.IInstanceHeartBeatService; +import org.apache.skywalking.apm.collector.analysis.register.define.AnalysisRegisterModule; +import org.apache.skywalking.apm.collector.analysis.register.define.service.IApplicationIDService; +import org.apache.skywalking.apm.collector.analysis.register.define.service.IInstanceIDService; +import org.apache.skywalking.apm.collector.analysis.register.define.service.INetworkAddressIDService; +import org.apache.skywalking.apm.collector.analysis.register.define.service.IServiceNameService; +import org.apache.skywalking.apm.collector.analysis.segment.parser.define.AnalysisSegmentParserModule; +import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParseService; +import org.apache.skywalking.apm.collector.core.module.*; +import org.apache.skywalking.apm.collector.jetty.manager.JettyManagerModule; +import org.apache.skywalking.apm.collector.jetty.manager.service.JettyManagerService; +import org.apache.skywalking.apm.collector.receiver.zipkin.define.ZipkinReceiverModule; +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.handler.SpanV2JettyHandler; +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.transform.Zipkin2SkyWalkingTransfer; +import org.apache.skywalking.apm.collector.server.jetty.JettyServer; + +/** + * @author wusheng + */ +public class ZipkinReceiverProvider extends ModuleProvider { + public static final String NAME = "default"; + private ZipkinReceiverConfig config; + + public ZipkinReceiverProvider() { + config = new ZipkinReceiverConfig(); + } + + @Override public String name() { + return NAME; + } + + @Override public Class<? extends ModuleDefine> module() { + return ZipkinReceiverModule.class; + } + + @Override public ModuleConfig createConfigBeanIfAbsent() { + return config; + } + + @Override public void prepare() throws ServiceNotProvidedException { + + } + + @Override public void start() throws ServiceNotProvidedException, ModuleStartException { + ModuleDefine moduleDefine = getManager().find(AnalysisRegisterModule.NAME); + RegisterServices registerServices = new RegisterServices(moduleDefine.getService(IApplicationIDService.class), + moduleDefine.getService(IInstanceIDService.class), + moduleDefine.getService(INetworkAddressIDService.class), + moduleDefine.getService(IServiceNameService.class)); + IInstanceHeartBeatService instanceHeartBeatService = getManager().find(AnalysisMetricModule.NAME).getService(IInstanceHeartBeatService.class); + Zipkin2SkyWalkingTransfer.INSTANCE.setRegisterServices(registerServices); + Zipkin2SkyWalkingTransfer.INSTANCE.setInstanceHeartBeatService(instanceHeartBeatService); + + JettyManagerService managerService = getManager().find(JettyManagerModule.NAME).getService(JettyManagerService.class); + JettyServer jettyServer = managerService.createIfAbsent(config.getHost(), config.getPort(), config.getContextPath()); + jettyServer.addHandler(new SpanV2JettyHandler(config, registerServices)); + + ISegmentParseService segmentParseService = getManager().find(AnalysisSegmentParserModule.NAME).getService(ISegmentParseService.class); + Receiver2AnalysisBridge bridge = new Receiver2AnalysisBridge(segmentParseService); + Zipkin2SkyWalkingTransfer.INSTANCE.addListener(bridge); + } + + @Override public void notifyAfterCompleted() throws ServiceNotProvidedException { + + } + + @Override public String[] requiredModules() { + return new String[] {JettyManagerModule.NAME, AnalysisSegmentParserModule.NAME, AnalysisMetricModule.NAME}; + } +} diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-define/src/main/java/org/apache/skywalking/apm/collector/analysis/register/define/service/IInstanceIDService.java b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/cache/CacheFactory.java similarity index 51% copy from apm-collector/apm-collector-analysis/analysis-register/register-define/src/main/java/org/apache/skywalking/apm/collector/analysis/register/define/service/IInstanceIDService.java copy to apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/cache/CacheFactory.java index ed84f05..450f82c 100644 --- a/apm-collector/apm-collector-analysis/analysis-register/register-define/src/main/java/org/apache/skywalking/apm/collector/analysis/register/define/service/IInstanceIDService.java +++ b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/cache/CacheFactory.java @@ -16,15 +16,30 @@ * */ -package org.apache.skywalking.apm.collector.analysis.register.define.service; +package org.apache.skywalking.apm.collector.receiver.zipkin.provider.cache; -import org.apache.skywalking.apm.collector.core.module.Service; +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.ZipkinReceiverConfig; +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.cache.caffeine.CaffeineSpanCache; /** - * @author peng-yongsheng + * @author wusheng */ -public interface IInstanceIDService extends Service { - int getOrCreateByAgentUUID(int applicationId, String agentUUID, long registerTime, String osInfo); +public class CacheFactory { + public static final CacheFactory INSTANCE = new CacheFactory(); - int getOrCreateByAddressId(int applicationId, int addressId, long registerTime); + private ISpanCache implementor; + + private CacheFactory() { + } + + public ISpanCache get(ZipkinReceiverConfig config) { + if (implementor == null) { + synchronized (INSTANCE) { + if (implementor == null) { + implementor = new CaffeineSpanCache(config); + } + } + } + return implementor; + } } diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-define/src/main/java/org/apache/skywalking/apm/collector/analysis/register/define/service/IInstanceIDService.java b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/cache/ISpanCache.java similarity index 66% copy from apm-collector/apm-collector-analysis/analysis-register/register-define/src/main/java/org/apache/skywalking/apm/collector/analysis/register/define/service/IInstanceIDService.java copy to apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/cache/ISpanCache.java index ed84f05..2af5afc 100644 --- a/apm-collector/apm-collector-analysis/analysis-register/register-define/src/main/java/org/apache/skywalking/apm/collector/analysis/register/define/service/IInstanceIDService.java +++ b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/cache/ISpanCache.java @@ -16,15 +16,13 @@ * */ -package org.apache.skywalking.apm.collector.analysis.register.define.service; +package org.apache.skywalking.apm.collector.receiver.zipkin.provider.cache; -import org.apache.skywalking.apm.collector.core.module.Service; +import zipkin2.Span; /** - * @author peng-yongsheng + * @author wusheng */ -public interface IInstanceIDService extends Service { - int getOrCreateByAgentUUID(int applicationId, String agentUUID, long registerTime, String osInfo); - - int getOrCreateByAddressId(int applicationId, int addressId, long registerTime); +public interface ISpanCache { + void addSpan(Span span); } diff --git a/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/cache/caffeine/CaffeineSpanCache.java b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/cache/caffeine/CaffeineSpanCache.java new file mode 100644 index 0000000..06a9a01 --- /dev/null +++ b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/cache/caffeine/CaffeineSpanCache.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.receiver.zipkin.provider.cache.caffeine; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.RemovalListener; +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.ZipkinReceiverConfig; +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.cache.ISpanCache; +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.data.ZipkinTrace; +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.transform.Zipkin2SkyWalkingTransfer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import zipkin2.Span; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +/** + * NOTICE: FROM my test, Caffeine cache triggers/checks expire only face write/read op. + * In order to make trace finish in time, I have to set a timer to write a meaningless trace, for active expire. + * + * @author wusheng + */ +public class CaffeineSpanCache implements ISpanCache, RemovalListener<String, ZipkinTrace> { + private static final Logger logger = LoggerFactory.getLogger(CaffeineSpanCache.class); + private Cache<String, ZipkinTrace> inProcessSpanCache; + private ReentrantLock newTraceLock; + + public CaffeineSpanCache(ZipkinReceiverConfig config) { + newTraceLock = new ReentrantLock(); + inProcessSpanCache = Caffeine.newBuilder() + .expireAfterWrite(config.getExpireTime(), TimeUnit.SECONDS) + .maximumSize(config.getMaxCacheSize()) + .removalListener(this) + .build(); + Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { + inProcessSpanCache.put("ACTIVE", new ZipkinTrace.TriggerTrace()); + }, 2, 3, TimeUnit.SECONDS); + } + + /** + * Zipkin trace finished by the expired rule. + * + * @param key + * @param trace + * @param cause + */ + @Override + public void onRemoval(@Nullable String key, @Nullable ZipkinTrace trace, @Nonnull RemovalCause cause) { + if (trace instanceof ZipkinTrace.TriggerTrace) { + return; + } + try { + Zipkin2SkyWalkingTransfer.INSTANCE.transfer(trace); + } catch (Exception e) { + logger.error(e.getMessage(), e); + logger.warn("Zipkin trace:" + trace); + } + } + + @Override + public void addSpan(Span span) { + ZipkinTrace trace = inProcessSpanCache.getIfPresent(span.traceId()); + if (trace == null) { + newTraceLock.lock(); + try { + trace = inProcessSpanCache.getIfPresent(span.traceId()); + if (trace == null) { + trace = new ZipkinTrace(); + inProcessSpanCache.put(span.traceId(), trace); + } + } finally { + newTraceLock.unlock(); + } + } + trace.addSpan(span); + } +} diff --git a/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/data/SkyWalkingTrace.java b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/data/SkyWalkingTrace.java new file mode 100644 index 0000000..517bc03 --- /dev/null +++ b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/data/SkyWalkingTrace.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.receiver.zipkin.provider.data; + +import org.apache.skywalking.apm.network.proto.TraceSegmentObject; +import org.apache.skywalking.apm.network.proto.UniqueId; +import org.apache.skywalking.apm.network.proto.UpstreamSegment; + +import java.util.LinkedList; +import java.util.List; + +/** + * Each SkyWalkingTrace consists of segments in each application, original from {@link ZipkinTrace}s + */ +public class SkyWalkingTrace { + private UniqueId globalTraceId; + private List<TraceSegmentObject.Builder> segmentList; + + public SkyWalkingTrace(UniqueId globalTraceId, List<TraceSegmentObject.Builder> segmentList) { + this.globalTraceId = globalTraceId; + this.segmentList = segmentList; + } + + public List<UpstreamSegment.Builder> toUpstreamSegment() { + List<UpstreamSegment.Builder> newUpstreamList = new LinkedList<>(); + segmentList.forEach(segment -> { + UpstreamSegment.Builder builder = UpstreamSegment.newBuilder(); + builder.addGlobalTraceIds(globalTraceId); + builder.setSegment(segment.build().toByteString()); + newUpstreamList.add(builder); + }); + return newUpstreamList; + } + + public UniqueId getGlobalTraceId() { + return globalTraceId; + } + + public List<TraceSegmentObject.Builder> getSegmentList() { + return segmentList; + } +} diff --git a/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/data/ZipkinTrace.java b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/data/ZipkinTrace.java new file mode 100644 index 0000000..7708453 --- /dev/null +++ b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/data/ZipkinTrace.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.receiver.zipkin.provider.data; + +import zipkin2.Span; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.locks.ReentrantLock; + +/** + * @author wusheng + */ +public class ZipkinTrace { + private List<Span> spans; + private ReentrantLock spanWriteLock; + + public ZipkinTrace() { + spans = new LinkedList<>(); + spanWriteLock = new ReentrantLock(); + } + + public void addSpan(Span span) { + spanWriteLock.lock(); + try { + spans.add(span); + } finally { + spanWriteLock.unlock(); + } + } + + public List<Span> getSpans() { + return spans; + } + + @Override + public String toString() { + return "ZipkinTrace{" + + "spans=" + spans + + '}'; + } + + public static class TriggerTrace extends ZipkinTrace { + + + } +} diff --git a/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/handler/SpanProcessor.java b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/handler/SpanProcessor.java new file mode 100644 index 0000000..f791846 --- /dev/null +++ b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/handler/SpanProcessor.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.receiver.zipkin.provider.handler; + +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.RegisterServices; +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.ZipkinReceiverConfig; +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.cache.CacheFactory; +import zipkin2.Span; +import zipkin2.codec.SpanBytesDecoder; + +import javax.servlet.ServletInputStream; +import javax.servlet.http.HttpServletRequest; +import java.io.IOException; +import java.util.List; + +public class SpanProcessor { + void convert(ZipkinReceiverConfig config, SpanBytesDecoder decoder, HttpServletRequest request, RegisterServices registerServices) throws IOException { + int len = request.getContentLength(); + ServletInputStream iii = request.getInputStream(); + byte[] buffer = new byte[len]; + iii.read(buffer, 0, len); + + List<Span> spanList = decoder.decodeList(buffer); + + spanList.forEach(span -> { + // In Zipkin, the local service name represents the application owner. + String applicationCode = span.localServiceName(); + if (applicationCode != null) { + int applicationId = registerServices.getApplicationIDService().getOrCreateForApplicationCode(applicationCode); + if (applicationId != 0) { + registerServices.getOrCreateApplicationInstanceId(applicationId, applicationCode); + } + } + + CacheFactory.INSTANCE.get(config).addSpan(span); + }); + } +} diff --git a/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/handler/SpanV1JettyHandler.java b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/handler/SpanV1JettyHandler.java new file mode 100644 index 0000000..a75c12d --- /dev/null +++ b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/handler/SpanV1JettyHandler.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.receiver.zipkin.provider.handler; + +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.RegisterServices; +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.ZipkinReceiverConfig; +import org.apache.skywalking.apm.collector.server.jetty.JettyHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import zipkin2.codec.SpanBytesDecoder; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +public class SpanV1JettyHandler extends JettyHandler { + private static final Logger logger = LoggerFactory.getLogger(SpanV2JettyHandler.class); + + private ZipkinReceiverConfig config; + private RegisterServices registerServices; + + public SpanV1JettyHandler(ZipkinReceiverConfig config, + RegisterServices registerServices) { + this.config = config; + this.registerServices = registerServices; + } + + @Override + public String pathSpec() { + return "/api/v1/spans"; + } + + @Override + protected void doPost(HttpServletRequest request, HttpServletResponse response) { + response.setContentType("application/json"); + response.setCharacterEncoding("utf-8"); + + try { + String type = request.getHeader("Content-Type"); + + SpanBytesDecoder decoder = type != null && type.contains("/x-thrift") + ? SpanBytesDecoder.THRIFT + : SpanBytesDecoder.JSON_V1; + + SpanProcessor processor = new SpanProcessor(); + processor.convert(config, decoder, request, registerServices); + + response.setStatus(202); + } catch (Exception e) { + response.setStatus(500); + + logger.error(e.getMessage(), e); + } + } + +} diff --git a/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/handler/SpanV2JettyHandler.java b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/handler/SpanV2JettyHandler.java new file mode 100644 index 0000000..7e044d4 --- /dev/null +++ b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/handler/SpanV2JettyHandler.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.receiver.zipkin.provider.handler; + +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.RegisterServices; +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.ZipkinReceiverConfig; +import org.apache.skywalking.apm.collector.server.jetty.JettyHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import zipkin2.codec.SpanBytesDecoder; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +/** + * @author wusheng + */ +public class SpanV2JettyHandler extends JettyHandler { + private static final Logger logger = LoggerFactory.getLogger(SpanV2JettyHandler.class); + + private ZipkinReceiverConfig config; + private RegisterServices registerServices; + + public SpanV2JettyHandler(ZipkinReceiverConfig config, + RegisterServices registerServices) { + this.config = config; + this.registerServices = registerServices; + } + + @Override + public String pathSpec() { + return "/api/v2/spans"; + } + + @Override + protected void doPost(HttpServletRequest request, HttpServletResponse response) { + response.setContentType("application/json"); + response.setCharacterEncoding("utf-8"); + + try { + String type = request.getHeader("Content-Type"); + + SpanBytesDecoder decoder = type != null && type.contains("/x-protobuf") + ? SpanBytesDecoder.PROTO3 + : SpanBytesDecoder.JSON_V2; + + SpanProcessor processor = new SpanProcessor(); + processor.convert(config, decoder, request, registerServices); + + response.setStatus(202); + } catch (Exception e) { + response.setStatus(500); + + logger.error(e.getMessage(), e); + } + } +} diff --git a/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/transform/SegmentBuilder.java b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/transform/SegmentBuilder.java new file mode 100644 index 0000000..8266eac --- /dev/null +++ b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/transform/SegmentBuilder.java @@ -0,0 +1,477 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.receiver.zipkin.provider.transform; + +import org.apache.skywalking.apm.collector.analysis.metric.define.service.IInstanceHeartBeatService; +import org.apache.skywalking.apm.collector.core.util.StringUtils; +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.RegisterServices; +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.data.SkyWalkingTrace; +import org.apache.skywalking.apm.network.proto.*; +import org.eclipse.jetty.util.StringUtil; +import zipkin2.Endpoint; +import zipkin2.Span; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +/** + * @author wusheng + */ +public class SegmentBuilder { + private Context context; + private LinkedList<Segment> segments; + private Map<String, ClientSideSpan> clientPartSpan; + + private SegmentBuilder() { + segments = new LinkedList<>(); + context = new Context(); + clientPartSpan = new HashMap<>(); + } + + public static SkyWalkingTrace build(List<Span> traceSpans, + RegisterServices registerServices, + IInstanceHeartBeatService instanceHeartBeatService) throws Exception { + SegmentBuilder builder = new SegmentBuilder(); + // This map groups the spans by their parent id, in order to assist to build tree. + // key: parentId + // value: span + Map<String, List<Span>> childSpanMap = new HashMap<>(); + AtomicReference<Span> root = new AtomicReference<>(); + traceSpans.forEach(span -> { + if (span.parentId() == null) { + root.set(span); + } + List<Span> spanList = childSpanMap.get(span.parentId()); + if (spanList == null) { + spanList = new LinkedList<>(); + spanList.add(span); + childSpanMap.put(span.parentId(), spanList); + } else { + spanList.add(span); + } + }); + + Span rootSpan = root.get(); + if (rootSpan != null) { + String applicationCode = rootSpan.localServiceName(); + // If root span doesn't include applicationCode, a.k.a local service name, + // Segment can't be built + // Ignore the whole trace. + // :P Hope anyone could provide better solution. + // Wu Sheng. + if (StringUtils.isNotEmpty(applicationCode)) { + builder.context.addApp(applicationCode, registerServices); + + SpanObject.Builder rootSpanBuilder = builder.initSpan(null, null, rootSpan, true); + builder.context.currentSegment().addSpan(rootSpanBuilder); + builder.scanSpansFromRoot(rootSpanBuilder, rootSpan, childSpanMap, registerServices); + + builder.segments.add(builder.context.removeApp()); + } + } + + List<TraceSegmentObject.Builder> segmentBuilders = new LinkedList<>(); + builder.segments.forEach(segment -> { + TraceSegmentObject.Builder traceSegmentBuilder = segment.freeze(); + segmentBuilders.add(traceSegmentBuilder); + instanceHeartBeatService.heartBeat(traceSegmentBuilder.getApplicationInstanceId(), segment.getEndTime()); + }); + return new SkyWalkingTrace(builder.generateTraceOrSegmentId(), segmentBuilders); + } + + private void scanSpansFromRoot(SpanObject.Builder parentSegmentSpan, Span parent, + Map<String, List<Span>> childSpanMap, + RegisterServices registerServices) throws Exception { + String parentId = parent.id(); + // get child spans by parent span id + List<Span> spanList = childSpanMap.get(parentId); + if (spanList == null) { + return; + } + for (Span childSpan : spanList) { + String localServiceName = childSpan.localServiceName(); + boolean isNewApp = false; + if (StringUtil.isNotBlank(localServiceName)) { + if (context.isAppChanged(localServiceName)) { + isNewApp = true; + } + } + + try { + if (isNewApp) { + context.addApp(localServiceName, registerServices); + } + SpanObject.Builder childSpanBuilder = initSpan(parentSegmentSpan, parent, childSpan, isNewApp); + + context.currentSegment().addSpan(childSpanBuilder); + scanSpansFromRoot(childSpanBuilder, childSpan, childSpanMap, registerServices); + + } finally { + if (isNewApp) { + segments.add(context.removeApp()); + } + } + } + } + + private SpanObject.Builder initSpan(SpanObject.Builder parentSegmentSpan, Span parentSpan, Span span, + boolean isSegmentRoot) { + SpanObject.Builder spanBuilder = SpanObject.newBuilder(); + spanBuilder.setSpanId(context.currentIDs().nextSpanId()); + if (isSegmentRoot) { + // spanId = -1, means no parent span + // spanId is considered unique, and from a positive sequence in each segment. + spanBuilder.setParentSpanId(-1); + } + if (!isSegmentRoot && parentSegmentSpan != null) { + spanBuilder.setParentSpanId(parentSegmentSpan.getSpanId()); + } + Span.Kind kind = span.kind(); + spanBuilder.setOperationName(span.name()); + ClientSideSpan clientSideSpan; + switch (kind) { + case CLIENT: + spanBuilder.setSpanType(SpanType.Exit); + String peer = endpoint2Peer(span.remoteEndpoint()); + if (peer != null) { + spanBuilder.setPeer(peer); + } + clientSideSpan = new ClientSideSpan(span, spanBuilder); + clientPartSpan.put(span.id(), clientSideSpan); + break; + case SERVER: + spanBuilder.setSpanType(SpanType.Entry); + this.buildRef(spanBuilder, span, parentSegmentSpan, parentSpan); + break; + case CONSUMER: + spanBuilder.setSpanType(SpanType.Entry); + this.buildRef(spanBuilder, span, parentSegmentSpan, parentSpan); + break; + case PRODUCER: + spanBuilder.setSpanType(SpanType.Exit); + peer = endpoint2Peer(span.remoteEndpoint()); + if (peer != null) { + spanBuilder.setPeer(peer); + } + clientSideSpan = new ClientSideSpan(span, spanBuilder); + clientPartSpan.put(span.id(), clientSideSpan); + break; + default: + spanBuilder.setSpanType(SpanType.Local); + } + // microseconds in Zipkin -> milliseconds in SkyWalking + long startTime = span.timestamp() / 1000; + long duration = span.duration() / 1000; + spanBuilder.setStartTime(startTime); + spanBuilder.setEndTime(startTime + duration); + + span.tags().forEach((tagKey, tagValue) -> spanBuilder.addTags( + KeyWithStringValue.newBuilder().setKey(tagKey).setValue(tagValue).build()) + ); + + span.annotations().forEach(annotation -> + spanBuilder.addLogs(LogMessage.newBuilder().setTime(annotation.timestamp() / 1000).addData( + KeyWithStringValue.newBuilder().setKey("zipkin.annotation").setValue(annotation.value()).build() + )) + ); + + return spanBuilder; + } + + private String endpoint2Peer(Endpoint endpoint) { + if (endpoint == null) { + return null; + } + String ip = null; + if (StringUtils.isNotEmpty(endpoint.ipv4())) { + ip = endpoint.ipv4(); + } else if (StringUtils.isNotEmpty(endpoint.ipv6())) { + ip = endpoint.ipv6(); + } + if (StringUtils.isEmpty(ip)) { + return null; + } + int port = endpoint.port(); + return port == 0 ? ip : ip + ":" + port; + } + + private void buildRef(SpanObject.Builder spanBuilder, Span span, SpanObject.Builder parentSegmentSpan, + Span parentSpan) { + Segment parentSegment = context.parentSegment(); + if (parentSegment == null) { + return; + } + Segment rootSegment = context.rootSegment(); + if (rootSegment == null) { + return; + } + + if (span.shared() != null && span.shared()) { + // using same span id in client and server for RPC + // SkyWalking will build both sides of span + ClientSideSpan clientSideSpan = clientPartSpan.get(span.id()); + parentSegmentSpan = clientSideSpan.getBuilder(); + parentSpan = clientSideSpan.getSpan(); + } + + String ip = null; + int port = 0; + Endpoint serverEndpoint = span.localEndpoint(); + Endpoint clientEndpoint = parentSpan.remoteEndpoint(); + if (clientEndpoint != null) { + if (StringUtil.isBlank(ip)) { + if (StringUtils.isNotEmpty(clientEndpoint.ipv4())) { + ip = clientEndpoint.ipv4(); + } else if (StringUtils.isNotEmpty(clientEndpoint.ipv6())) { + ip = clientEndpoint.ipv6(); + } + port = clientEndpoint.port(); + } + } + if (serverEndpoint != null) { + if (StringUtils.isNotEmpty(serverEndpoint.ipv4())) { + ip = serverEndpoint.ipv4(); + } else if (StringUtils.isNotEmpty(serverEndpoint.ipv6())) { + ip = serverEndpoint.ipv6(); + } + } + + if (StringUtil.isBlank(ip)) { + //The IP is the most important for building the ref at both sides. + return; + } + + TraceSegmentReference.Builder refBuilder = TraceSegmentReference.newBuilder(); + refBuilder.setEntryApplicationInstanceId(rootSegment.builder().getApplicationInstanceId()); + int serviceId = rootSegment.getEntryServiceId(); + if (serviceId == 0) { + refBuilder.setEntryServiceName(rootSegment.getEntryServiceName()); + } else { + refBuilder.setEntryServiceId(serviceId); + } + refBuilder.setEntryApplicationInstanceId(rootSegment.builder().getApplicationInstanceId()); + + // parent ref info + refBuilder.setNetworkAddress(port == 0 ? ip : ip + ":" + port); + parentSegmentSpan.setPeer(refBuilder.getNetworkAddress()); + refBuilder.setParentApplicationInstanceId(parentSegment.builder().getApplicationInstanceId()); + refBuilder.setParentSpanId(parentSegmentSpan.getSpanId()); + refBuilder.setParentTraceSegmentId(parentSegment.builder().getTraceSegmentId()); + int parentServiceId = parentSegment.getEntryServiceId(); + if (parentServiceId == 0) { + refBuilder.setParentServiceName(parentSegment.getEntryServiceName()); + } else { + refBuilder.setParentServiceId(parentServiceId); + } + refBuilder.setRefType(RefType.CrossProcess); + + spanBuilder.addRefs(refBuilder); + } + + /** + * Context holds the values in build process. + */ + private class Context { + private LinkedList<Segment> segmentsStack = new LinkedList<>(); + + private boolean isAppChanged(String applicationCode) { + return StringUtils.isNotEmpty(applicationCode) && !applicationCode.equals(currentIDs().applicationCode); + } + + private Segment addApp(String applicationCode, + RegisterServices registerServices) throws Exception { + int applicationId = waitForExchange(() -> + registerServices.getApplicationIDService().getOrCreateForApplicationCode(applicationCode), + 10 + ); + + int appInstanceId = waitForExchange(() -> + registerServices.getOrCreateApplicationInstanceId(applicationId, applicationCode), + 10 + ); + + Segment segment = new Segment(applicationCode, applicationId, appInstanceId); + segmentsStack.add(segment); + return segment; + } + + private IDCollection currentIDs() { + return segmentsStack.getLast().ids; + } + + private Segment currentSegment() { + return segmentsStack.getLast(); + } + + private Segment parentSegment() { + if (segmentsStack.size() < 2) { + return null; + } else { + return segmentsStack.get(segmentsStack.size() - 2); + } + + } + + private Segment rootSegment() { + if (segmentsStack.size() < 2) { + return null; + } else { + return segmentsStack.getFirst(); + } + } + + private Segment removeApp() { + return segmentsStack.removeLast(); + } + + private int waitForExchange(Callable<Integer> callable, int retry) throws Exception { + for (int i = 0; i < retry; i++) { + Integer id = callable.call(); + if (id == 0) { + Thread.sleep(1000L); + } else { + return id; + } + } + throw new TimeoutException("ID exchange costs more than expected."); + } + } + + private class Segment { + private TraceSegmentObject.Builder segmentBuilder; + private IDCollection ids; + private int entryServiceId = 0; + private String entryServiceName = null; + private List<SpanObject.Builder> spans; + private long endTime = 0; + + private Segment(String applicationCode, int applicationId, int appInstanceId) { + ids = new IDCollection(applicationCode, applicationId, appInstanceId); + spans = new LinkedList<>(); + segmentBuilder = TraceSegmentObject.newBuilder(); + segmentBuilder.setApplicationId(applicationId); + segmentBuilder.setApplicationInstanceId(appInstanceId); + segmentBuilder.setTraceSegmentId(generateTraceOrSegmentId()); + } + + private TraceSegmentObject.Builder builder() { + return segmentBuilder; + } + + private void addSpan(SpanObject.Builder spanBuilder) { + String operationName = spanBuilder.getOperationName(); + if (entryServiceId == 0 && StringUtils.isNotEmpty(operationName)) { + if (SpanType.Entry == spanBuilder.getSpanType()) { + if (StringUtils.isNotEmpty(operationName)) { + entryServiceName = operationName; + } else { + entryServiceId = spanBuilder.getOperationNameId(); + } + } + } + + // init by root span + if (spanBuilder.getSpanId() == 1 && entryServiceId == 0) { + if (StringUtils.isNotEmpty(operationName)) { + entryServiceName = operationName; + } else { + entryServiceId = spanBuilder.getOperationNameId(); + } + } + + spans.add(spanBuilder); + if (spanBuilder.getEndTime() > endTime) { + endTime = spanBuilder.getEndTime(); + } + } + + public int getEntryServiceId() { + return entryServiceId; + } + + public String getEntryServiceName() { + return entryServiceName; + } + + private IDCollection ids() { + return ids; + } + + public TraceSegmentObject.Builder freeze() { + for (SpanObject.Builder span : spans) { + segmentBuilder.addSpans(span); + } + return segmentBuilder; + } + + public long getEndTime() { + return endTime; + } + } + + private class IDCollection { + private String applicationCode; + private int appId; + private int instanceId; + private int spanIdSeq; + + private IDCollection(String applicationCode, int appId, int instanceId) { + this.applicationCode = applicationCode; + this.appId = appId; + this.instanceId = instanceId; + this.spanIdSeq = 0; + } + + private int nextSpanId() { + return spanIdSeq++; + } + } + + private UniqueId generateTraceOrSegmentId() { + return UniqueId.newBuilder() + .addIdParts(ThreadLocalRandom.current().nextLong()) + .addIdParts(ThreadLocalRandom.current().nextLong()) + .addIdParts(ThreadLocalRandom.current().nextLong()) + .build(); + } + + private class ClientSideSpan { + private Span span; + private SpanObject.Builder builder; + + public ClientSideSpan(Span span, SpanObject.Builder builder) { + this.span = span; + this.builder = builder; + } + + public Span getSpan() { + return span; + } + + public SpanObject.Builder getBuilder() { + return builder; + } + } +} diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-define/src/main/java/org/apache/skywalking/apm/collector/analysis/register/define/service/IInstanceIDService.java b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/transform/SegmentListener.java similarity index 65% copy from apm-collector/apm-collector-analysis/analysis-register/register-define/src/main/java/org/apache/skywalking/apm/collector/analysis/register/define/service/IInstanceIDService.java copy to apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/transform/SegmentListener.java index ed84f05..72623e0 100644 --- a/apm-collector/apm-collector-analysis/analysis-register/register-define/src/main/java/org/apache/skywalking/apm/collector/analysis/register/define/service/IInstanceIDService.java +++ b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/transform/SegmentListener.java @@ -16,15 +16,10 @@ * */ -package org.apache.skywalking.apm.collector.analysis.register.define.service; +package org.apache.skywalking.apm.collector.receiver.zipkin.provider.transform; -import org.apache.skywalking.apm.collector.core.module.Service; +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.data.SkyWalkingTrace; -/** - * @author peng-yongsheng - */ -public interface IInstanceIDService extends Service { - int getOrCreateByAgentUUID(int applicationId, String agentUUID, long registerTime, String osInfo); - - int getOrCreateByAddressId(int applicationId, int addressId, long registerTime); +public interface SegmentListener { + void notify(SkyWalkingTrace trace); } diff --git a/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/transform/Zipkin2SkyWalkingTransfer.java b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/transform/Zipkin2SkyWalkingTransfer.java new file mode 100644 index 0000000..174903b --- /dev/null +++ b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/transform/Zipkin2SkyWalkingTransfer.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.receiver.zipkin.provider.transform; + +import org.apache.skywalking.apm.collector.analysis.metric.define.service.IInstanceHeartBeatService; +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.RegisterServices; +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.data.SkyWalkingTrace; +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.data.ZipkinTrace; +import zipkin2.Span; + +import java.util.LinkedList; +import java.util.List; + +/** + * @author wusheng + */ +public class Zipkin2SkyWalkingTransfer { + public static Zipkin2SkyWalkingTransfer INSTANCE = new Zipkin2SkyWalkingTransfer(); + private RegisterServices registerServices; + private IInstanceHeartBeatService instanceHeartBeatService; + private List<SegmentListener> listeners = new LinkedList<>(); + + private Zipkin2SkyWalkingTransfer() { + } + + public void setRegisterServices( + RegisterServices registerServices) { + this.registerServices = registerServices; + } + + public void setInstanceHeartBeatService(IInstanceHeartBeatService instanceHeartBeatService) { + this.instanceHeartBeatService = instanceHeartBeatService; + } + + public void addListener(SegmentListener listener) { + listeners.add(listener); + } + + public void transfer(ZipkinTrace trace) throws Exception { + List<Span> traceSpans = trace.getSpans(); + + if (traceSpans.size() > 0) { + SkyWalkingTrace skyWalkingTrace = SegmentBuilder.build(traceSpans, registerServices, instanceHeartBeatService); + + listeners.forEach(listener -> + listener.notify(skyWalkingTrace) + ); + + } + } +} diff --git a/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/resources/META-INF/services/org.apache.skywalking.apm.collector.core.module.ModuleProvider b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/resources/META-INF/services/org.apache.skywalking.apm.collector.core.module.ModuleProvider new file mode 100644 index 0000000..9046f75 --- /dev/null +++ b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/main/resources/META-INF/services/org.apache.skywalking.apm.collector.core.module.ModuleProvider @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +org.apache.skywalking.apm.collector.receiver.zipkin.provider.ZipkinReceiverProvider diff --git a/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/test/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/transform/SpringSleuthSegmentBuilderTest.java b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/test/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/transform/SpringSleuthSegmentBuilderTest.java new file mode 100644 index 0000000..8a5e1c7 --- /dev/null +++ b/apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/receiver-zipkin-provider/src/test/java/org/apache/skywalking/apm/collector/receiver/zipkin/provider/transform/SpringSleuthSegmentBuilderTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.receiver.zipkin.provider.transform; + +import org.apache.skywalking.apm.collector.analysis.metric.define.service.IInstanceHeartBeatService; +import org.apache.skywalking.apm.collector.analysis.register.define.service.AgentOsInfo; +import org.apache.skywalking.apm.collector.analysis.register.define.service.IApplicationIDService; +import org.apache.skywalking.apm.collector.analysis.register.define.service.IInstanceIDService; +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.RegisterServices; +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.data.SkyWalkingTrace; +import org.apache.skywalking.apm.collector.receiver.zipkin.provider.data.ZipkinTrace; +import org.apache.skywalking.apm.network.proto.SpanObject; +import org.apache.skywalking.apm.network.proto.SpanType; +import org.apache.skywalking.apm.network.proto.TraceSegmentObject; +import org.apache.skywalking.apm.network.proto.TraceSegmentReference; +import org.junit.Assert; +import org.junit.Test; +import zipkin2.Span; +import zipkin2.codec.SpanBytesDecoder; + +import java.io.UnsupportedEncodingException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * @author wusheng + */ +public class SpringSleuthSegmentBuilderTest implements SegmentListener { + private Map<String, Integer> applicationInstRegister = new HashMap<>(); + private Map<String, Integer> applicationRegister = new HashMap<>(); + private int appIdSeg = 1; + private int appInstIdSeq = 1; + + @Test + public void testTransform() throws Exception { + + IApplicationIDService applicationIDService = new IApplicationIDService() { + @Override + public int getOrCreateForApplicationCode(String applicationCode) { + String key = "AppCode:" + applicationCode; + if (applicationRegister.containsKey(key)) { + return applicationRegister.get(key); + } else { + int id = appIdSeg++; + applicationRegister.put(key, id); + return id; + } + } + + @Override + public int getOrCreateForAddressId(int addressId, String networkAddress) { + String key = "Address:" + networkAddress; + if (applicationRegister.containsKey(key)) { + return applicationRegister.get(key); + } else { + int id = appIdSeg++; + applicationRegister.put(key, id); + return id; + } + } + }; + + IInstanceIDService instanceIDService = new IInstanceIDService() { + @Override + public int getOrCreateByAgentUUID(int applicationId, String agentUUID, long registerTime, AgentOsInfo osInfo) { + String key = "AppCode:" + applicationId + ",UUID:" + agentUUID; + if (applicationInstRegister.containsKey(key)) { + return applicationInstRegister.get(key); + } else { + int id = appInstIdSeq++; + applicationInstRegister.put(key, id); + return id; + } + } + + @Override + public int getOrCreateByAddressId(int applicationId, int addressId, long registerTime) { + String key = "VitualAppCode:" + applicationId + ",address:" + addressId; + if (applicationInstRegister.containsKey(key)) { + return applicationInstRegister.get(key); + } else { + int id = appInstIdSeq++; + applicationInstRegister.put(key, id); + return id; + } + } + }; + RegisterServices services = new RegisterServices(applicationIDService, instanceIDService, null, null); + + Zipkin2SkyWalkingTransfer.INSTANCE.addListener(this); + Zipkin2SkyWalkingTransfer.INSTANCE.setRegisterServices(services); + Zipkin2SkyWalkingTransfer.INSTANCE.setInstanceHeartBeatService(new IInstanceHeartBeatService() { + @Override + public void heartBeat(int instanceId, long heartBeatTime) { + + } + }); + + List<Span> spanList = buildSpringSleuthExampleTrace(); + Assert.assertEquals(3, spanList.size()); + + ZipkinTrace trace = new ZipkinTrace(); + spanList.forEach(span -> trace.addSpan(span)); + + Zipkin2SkyWalkingTransfer.INSTANCE.transfer(trace); + } + + private List<Span> buildSpringSleuthExampleTrace() throws UnsupportedEncodingException { + List<Span> spans = new LinkedList<>(); + String span = "{\"traceId\":\"5b0e64354eea4fa71a8a1b5bdd791b8a\",\"id\":\"1a8a1b5bdd791b8a\",\"kind\":\"SERVER\",\"name\":\"get /\",\"timestamp\":1527669813700123,\"duration\":11295,\"localEndpoint\":{\"serviceName\":\"frontend\",\"ipv4\":\"192.168.72.220\"},\"remoteEndpoint\":{\"ipv6\":\"::1\",\"port\":55146},\"tags\":{\"http.method\":\"GET\",\"http.path\":\"/\",\"mvc.controller.class\":\"Frontend\",\"mvc.controller.method\":\"callBackend\"}}"; + spans.add(SpanBytesDecoder.JSON_V2.decodeOne(span.getBytes("UTF-8"))); + span = "{\"traceId\":\"5b0e64354eea4fa71a8a1b5bdd791b8a\",\"parentId\":\"1a8a1b5bdd791b8a\",\"id\":\"d7d5b93dcda767c8\",\"kind\":\"CLIENT\",\"name\":\"get\",\"timestamp\":1527669813702456,\"duration\":6672,\"localEndpoint\":{\"serviceName\":\"frontend\",\"ipv4\":\"192.168.72.220\"},\"tags\":{\"http.method\":\"GET\",\"http.path\":\"/api\"}}"; + spans.add(SpanBytesDecoder.JSON_V2.decodeOne(span.getBytes("UTF-8"))); + span = "{\"traceId\":\"5b0e64354eea4fa71a8a1b5bdd791b8a\",\"parentId\":\"1a8a1b5bdd791b8a\",\"id\":\"d7d5b93dcda767c8\",\"kind\":\"SERVER\",\"name\":\"get /api\",\"timestamp\":1527669813705106,\"duration\":4802,\"localEndpoint\":{\"serviceName\":\"backend\",\"ipv4\":\"192.168.72.220\"},\"remoteEndpoint\":{\"ipv4\":\"127.0.0.1\",\"port\":55147},\"tags\":{\"http.method\":\"GET\",\"http.path\":\"/api\",\"mvc.controller.class\":\"Backend\",\"mvc.controller.method\":\"printDate\"},\"s [...] + spans.add(SpanBytesDecoder.JSON_V2.decodeOne(span.getBytes("UTF-8"))); + + return SpanBytesDecoder.JSON_V2.decodeList(spans.toString().getBytes("UTF-8")); + } + + @Override + public void notify(SkyWalkingTrace trace) { + List<TraceSegmentObject.Builder> segments = trace.getSegmentList(); + Assert.assertEquals(2, segments.size()); + TraceSegmentObject.Builder builder = segments.get(0); + TraceSegmentObject.Builder builder1 = segments.get(1); + TraceSegmentObject.Builder front, end; + if (builder.getApplicationId() == applicationRegister.get("AppCode:frontend")) { + front = builder; + end = builder1; + Assert.assertEquals(applicationRegister.get("AppCode:backend").longValue(), builder1.getApplicationId()); + } else if (builder.getApplicationId() == applicationRegister.get("AppCode:backend")) { + end = builder; + front = builder1; + Assert.assertEquals(applicationRegister.get("AppCode:frontend").longValue(), builder1.getApplicationId()); + } else { + Assert.fail("Can't find frontend and backend applications. "); + return; + } + + Assert.assertEquals(2, front.getSpansCount()); + Assert.assertEquals(1, end.getSpansCount()); + + front.getSpansList().forEach(spanObject -> { + if (spanObject.getSpanId() == 0) { + // span id = 1, means incoming http of frontend + Assert.assertEquals(SpanType.Entry, spanObject.getSpanType()); + Assert.assertEquals("get /", spanObject.getOperationName()); + Assert.assertEquals(0, spanObject.getSpanId()); + Assert.assertEquals(-1, spanObject.getParentSpanId()); + } else if (spanObject.getSpanId() == 1) { + Assert.assertEquals("192.168.72.220", spanObject.getPeer()); + Assert.assertEquals(SpanType.Exit, spanObject.getSpanType()); + Assert.assertEquals(1, spanObject.getSpanId()); + Assert.assertEquals(0, spanObject.getParentSpanId()); + } else { + Assert.fail("Only two spans expected"); + } + Assert.assertTrue(spanObject.getTagsCount() > 0); + }); + + SpanObject spanObject = end.getSpans(0); + + Assert.assertEquals(1, spanObject.getRefsCount()); + TraceSegmentReference spanObjectRef = spanObject.getRefs(0); + Assert.assertEquals("get", spanObjectRef.getEntryServiceName()); + Assert.assertEquals("get", spanObjectRef.getParentServiceName()); + //Assert.assertEquals("192.168.72.220", spanObjectRef.getNetworkAddress()); + Assert.assertEquals(1, spanObjectRef.getParentSpanId()); + Assert.assertEquals(front.getTraceSegmentId(), spanObjectRef.getParentTraceSegmentId()); + + Assert.assertTrue(spanObject.getTagsCount() > 0); + + Assert.assertEquals("get /api", spanObject.getOperationName()); + Assert.assertEquals(0, spanObject.getSpanId()); + Assert.assertEquals(-1, spanObject.getParentSpanId()); + Assert.assertEquals(SpanType.Entry, spanObject.getSpanType()); + } +} diff --git a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/jetty/handler/GraphQLHandler.java b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/jetty/handler/GraphQLHandler.java index ae49a8e..56f36c4 100644 --- a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/jetty/handler/GraphQLHandler.java +++ b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/jetty/handler/GraphQLHandler.java @@ -39,7 +39,7 @@ import javax.servlet.http.HttpServletRequest; import org.apache.skywalking.apm.collector.core.module.ModuleManager; import org.apache.skywalking.apm.collector.core.util.CollectionUtils; import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException; -import org.apache.skywalking.apm.collector.server.jetty.JettyHandler; +import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler; import org.apache.skywalking.apm.collector.storage.ui.application.ApplicationNode; import org.apache.skywalking.apm.collector.storage.ui.application.ConjecturalNode; import org.apache.skywalking.apm.collector.storage.ui.common.VisualUserNode; @@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ -public class GraphQLHandler extends JettyHandler { +public class GraphQLHandler extends JettyJsonHandler { private static final Logger logger = LoggerFactory.getLogger(GraphQLHandler.class); diff --git a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/jetty/handler/naming/UIJettyNamingHandler.java b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/jetty/handler/naming/UIJettyNamingHandler.java index dc035f8..e9920dd 100644 --- a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/jetty/handler/naming/UIJettyNamingHandler.java +++ b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/jetty/handler/naming/UIJettyNamingHandler.java @@ -24,12 +24,12 @@ import com.google.gson.JsonElement; import java.util.Set; import javax.servlet.http.HttpServletRequest; import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException; -import org.apache.skywalking.apm.collector.server.jetty.JettyHandler; +import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler; /** * @author peng-yongsheng */ -public class UIJettyNamingHandler extends JettyHandler { +public class UIJettyNamingHandler extends JettyJsonHandler { private final UIJettyNamingListener namingListener; diff --git a/apm-collector/pom.xml b/apm-collector/pom.xml index 56702ac..11beffb 100644 --- a/apm-collector/pom.xml +++ b/apm-collector/pom.xml @@ -45,6 +45,7 @@ <module>apm-collector-configuration</module> <module>apm-collector-agent</module> <module>apm-collector-analysis</module> + <module>apm-collector-thirdparty-receiver</module> </modules> <properties> @@ -63,6 +64,7 @@ <jedis.version>2.9.0</jedis.version> <zookeeper.version>3.4.10</zookeeper.version> <elasticsearch.client.version>5.5.0</elasticsearch.client.version> + <zipkin.version>2.9.1</zipkin.version> <shardingjdbc.version>2.0.3</shardingjdbc.version> <commons-dbcp.version>1.4</commons-dbcp.version> </properties> @@ -248,6 +250,11 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>io.zipkin.zipkin2</groupId> + <artifactId>zipkin</artifactId> + <version>${zipkin.version}</version> + </dependency> </dependencies> </dependencyManagement> </project> diff --git a/apm-dist/release-docs/LICENSE b/apm-dist/release-docs/LICENSE index 8984e46..f0098a6 100644 --- a/apm-dist/release-docs/LICENSE +++ b/apm-dist/release-docs/LICENSE @@ -289,6 +289,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt. validation-api 1.1.0.Final: http://beanvalidation.org/licensing/, Apache 2.0 zuul-core 1.3.0: https://github.com/Netflix/zuul, Apache 2.0 ben-manes caffeine 2.6.2: https://github.com/ben-manes/caffeine, Apache 2.0 + zipkin 2.9.1: https://github.com/openzipkin/zipkin, Apache 2.0 sharding-jdbc-core 2.0.3: https://github.com/sharding-sphere/sharding-sphere, Apache 2.0 ======================================================================== diff --git a/docs/README.md b/docs/README.md index 09b4e19..bf34c05 100644 --- a/docs/README.md +++ b/docs/README.md @@ -21,6 +21,7 @@ * Incubating Features * [Why are some features in **Incubating**?](en/Incubating/Abstract.md) * [Use Sharding JDBC as storage implementor](en/Use-ShardingJDBC-as-storage-implementor.md) + * [Receive Zipkin span data format](../apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/docs/README.md) * Application Toolkit * [Overview](en/Applicaton-toolkit.md) * [Use SkyWalking OpenTracing compatible tracer](en/Opentracing.md) diff --git a/docs/en/Incubating/Abstract.md b/docs/en/Incubating/Abstract.md index 452146d..34769d3 100644 --- a/docs/en/Incubating/Abstract.md +++ b/docs/en/Incubating/Abstract.md @@ -10,3 +10,4 @@ List some typical incubating features 1. New storage implementor in collector, such as: ElasticSearch HTTP, MySQL... 1. New module provided in collector. 1. New optional plugins in agent. +1. New features/services in collector, as the integration. -- To stop receiving notification emails like this one, please contact wush...@apache.org.