peng-yongsheng closed pull request #817: Instance heart beat tested with 
elastic search. 
URL: https://github.com/apache/incubator-skywalking/pull/817
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/InstanceDiscoveryServiceHandler.java
 
b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/InstanceDiscoveryServiceHandler.java
index cde3e4c69..2b20ce391 100644
--- 
a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/InstanceDiscoveryServiceHandler.java
+++ 
b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/InstanceDiscoveryServiceHandler.java
@@ -66,6 +66,8 @@ public void registerInstance(ApplicationInstance request,
         int instanceId = request.getApplicationInstanceId();
         long heartBeatTime = request.getHeartbeatTime();
         this.instanceHeartBeatService.heartBeat(instanceId, heartBeatTime);
+        responseObserver.onNext(Downstream.getDefaultInstance());
+        responseObserver.onCompleted();
     }
 
     private String buildOsInfo(OSInfo osinfo) {
diff --git 
a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/RegisterMock.java
 
b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/RegisterMock.java
index 6bf5d1690..11c7f070a 100644
--- 
a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/RegisterMock.java
+++ 
b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/RegisterMock.java
@@ -20,8 +20,12 @@
 
 import io.grpc.ManagedChannel;
 import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import org.apache.skywalking.apm.network.proto.Application;
 import org.apache.skywalking.apm.network.proto.ApplicationInstance;
+import org.apache.skywalking.apm.network.proto.ApplicationInstanceHeartbeat;
+import org.apache.skywalking.apm.network.proto.ApplicationInstanceMapping;
 import org.apache.skywalking.apm.network.proto.ApplicationMapping;
 import org.apache.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
 import org.apache.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc;
@@ -29,18 +33,24 @@
 import org.apache.skywalking.apm.network.proto.ServiceNameCollection;
 import org.apache.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc;
 import org.apache.skywalking.apm.network.proto.ServiceNameElement;
+import org.apache.skywalking.apm.network.proto.ServiceNameMappingCollection;
+import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
 import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author peng-yongsheng
  */
 class RegisterMock {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(RegisterMock.class);
+
     private 
ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub 
applicationRegisterServiceBlockingStub;
     private InstanceDiscoveryServiceGrpc.InstanceDiscoveryServiceBlockingStub 
instanceDiscoveryServiceBlockingStub;
     private 
ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceBlockingStub 
serviceNameDiscoveryServiceBlockingStub;
 
-    void mock(ManagedChannel channel) {
+    void mock(ManagedChannel channel) throws InterruptedException {
         applicationRegisterServiceBlockingStub = 
ApplicationRegisterServiceGrpc.newBlockingStub(channel);
         instanceDiscoveryServiceBlockingStub = 
InstanceDiscoveryServiceGrpc.newBlockingStub(channel);
         serviceNameDiscoveryServiceBlockingStub = 
ServiceNameDiscoveryServiceGrpc.newBlockingStub(channel);
@@ -48,10 +58,17 @@ void mock(ManagedChannel channel) {
         registerProvider();
     }
 
-    private void registerConsumer() {
+    private void registerConsumer() throws InterruptedException {
         Application.Builder application = Application.newBuilder();
         application.setApplicationCode("dubbox-consumer");
-        ApplicationMapping applicationMapping = 
applicationRegisterServiceBlockingStub.applicationCodeRegister(application.build());
+
+        ApplicationMapping applicationMapping;
+        do {
+            applicationMapping = 
applicationRegisterServiceBlockingStub.applicationCodeRegister(application.build());
+            logger.debug("application id: {}", 
applicationMapping.getApplication().getValue());
+            Thread.sleep(20);
+        }
+        while (applicationMapping.getApplication().getValue() == 0);
 
         ApplicationInstance.Builder instance = 
ApplicationInstance.newBuilder();
         
instance.setApplicationId(applicationMapping.getApplication().getValue());
@@ -65,20 +82,37 @@ private void registerConsumer() {
         osInfo.addIpv4S("10.0.0.3");
         osInfo.addIpv4S("10.0.0.4");
         instance.setOsinfo(osInfo);
-        
instanceDiscoveryServiceBlockingStub.registerInstance(instance.build());
+
+        ApplicationInstanceMapping instanceMapping;
+        do {
+            instanceMapping = 
instanceDiscoveryServiceBlockingStub.registerInstance(instance.build());
+            logger.debug("instance id: {}", 
instanceMapping.getApplicationInstanceId());
+            Thread.sleep(20);
+        }
+        while (instanceMapping.getApplicationInstanceId() == 0);
 
         ServiceNameCollection.Builder serviceNameCollection = 
ServiceNameCollection.newBuilder();
         ServiceNameElement.Builder serviceNameElement = 
ServiceNameElement.newBuilder();
         
serviceNameElement.setApplicationId(applicationMapping.getApplication().getValue());
         
serviceNameElement.setServiceName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
         serviceNameCollection.addElements(serviceNameElement);
-        
serviceNameDiscoveryServiceBlockingStub.discovery(serviceNameCollection.build());
+
+        registerServiceName(serviceNameCollection);
+
+        heartBeatScheduled(instanceMapping.getApplicationInstanceId());
     }
 
-    private void registerProvider() {
+    private void registerProvider() throws InterruptedException {
         Application.Builder application = Application.newBuilder();
         application.setApplicationCode("dubbox-provider");
-        ApplicationMapping applicationMapping = 
applicationRegisterServiceBlockingStub.applicationCodeRegister(application.build());
+
+        ApplicationMapping applicationMapping;
+        do {
+            applicationMapping = 
applicationRegisterServiceBlockingStub.applicationCodeRegister(application.build());
+            logger.debug("application id: {}", 
applicationMapping.getApplication().getValue());
+            Thread.sleep(20);
+        }
+        while (applicationMapping.getApplication().getValue() == 0);
 
         ApplicationInstance.Builder instance = 
ApplicationInstance.newBuilder();
         
instance.setApplicationId(applicationMapping.getApplication().getValue());
@@ -92,13 +126,51 @@ private void registerProvider() {
         osInfo.addIpv4S("10.0.0.1");
         osInfo.addIpv4S("10.0.0.2");
         instance.setOsinfo(osInfo);
-        
instanceDiscoveryServiceBlockingStub.registerInstance(instance.build());
+
+        ApplicationInstanceMapping instanceMapping;
+        do {
+            instanceMapping = 
instanceDiscoveryServiceBlockingStub.registerInstance(instance.build());
+            logger.debug("instance id: {}", 
instanceMapping.getApplicationInstanceId());
+            Thread.sleep(20);
+        }
+        while (instanceMapping.getApplicationInstanceId() == 0);
 
         ServiceNameCollection.Builder serviceNameCollection = 
ServiceNameCollection.newBuilder();
         ServiceNameElement.Builder serviceNameElement = 
ServiceNameElement.newBuilder();
         
serviceNameElement.setApplicationId(applicationMapping.getApplication().getValue());
         
serviceNameElement.setServiceName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
         serviceNameCollection.addElements(serviceNameElement);
-        
serviceNameDiscoveryServiceBlockingStub.discovery(serviceNameCollection.build());
+
+        registerServiceName(serviceNameCollection);
+
+        heartBeatScheduled(instanceMapping.getApplicationInstanceId());
+    }
+
+    private void registerServiceName(ServiceNameCollection.Builder 
serviceNameCollection) throws InterruptedException {
+        ServiceNameMappingCollection serviceNameMappingCollection;
+        do {
+            serviceNameMappingCollection = 
serviceNameDiscoveryServiceBlockingStub.discovery(serviceNameCollection.build());
+            logger.debug("service name mapping collection size: {}", 
serviceNameMappingCollection.getElementsCount());
+            if (serviceNameMappingCollection.getElementsCount() > 0) {
+                logger.debug("service id: {}", 
serviceNameMappingCollection.getElements(0).getServiceId());
+            }
+            Thread.sleep(20);
+        }
+        while (serviceNameMappingCollection.getElementsCount() == 0 || 
serviceNameMappingCollection.getElements(0).getServiceId() == 0);
+    }
+
+    private void heartBeatScheduled(int instanceId) {
+        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
+            new RunnableWithExceptionProtection(() -> heartBeat(instanceId),
+                t -> logger.error("instance heart beat scheduled error.", t)), 
4, 1, TimeUnit.SECONDS);
+    }
+
+    private void heartBeat(int instanceId) {
+        long now = System.currentTimeMillis();
+        logger.debug("instance heart beat, instance id: {}, time: {}", 
instanceId, now);
+        ApplicationInstanceHeartbeat.Builder heartbeat = 
ApplicationInstanceHeartbeat.newBuilder();
+        heartbeat.setApplicationInstanceId(instanceId);
+        heartbeat.setHeartbeatTime(now);
+        instanceDiscoveryServiceBlockingStub.heartbeat(heartbeat.build());
     }
 }
diff --git 
a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/TraceSegmentMock.java
 
b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/TraceSegmentMock.java
index 8fe9ee82a..c290bd30a 100644
--- 
a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/TraceSegmentMock.java
+++ 
b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/TraceSegmentMock.java
@@ -83,6 +83,8 @@ public static void main(String[] args) throws 
InterruptedException {
         while (sleeping.getValue()) {
             Thread.sleep(200);
         }
+
+        Thread.sleep(200000);
     }
 
     static class Sleeping {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to