This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ab02f4f   [functions][stats] don't generate function stats at worker 
service if runtime is k8s (#2724)
ab02f4f is described below

commit ab02f4fdb26d5dc19a4cda85ee2fe33f5a774ad3
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Fri Oct 5 00:51:26 2018 -0700

     [functions][stats] don't generate function stats at worker service if 
runtime is k8s (#2724)
    
    *Motivation*
    
    k8s runtime doesn't support generating function stats at worker service 
right now.
    so skip it for now until that feature is added.
    
    *Changes*
    
    skip function stats for k8s runtime
---
 .../pulsar/functions/worker/FunctionsStatsGenerator.java  |  6 ++++++
 .../functions/worker/FunctionStatsGeneratorTest.java      | 15 +++++++++++++++
 2 files changed, 21 insertions(+)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index 3e219d6..80c1b77 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.worker;
 
 import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
 import org.apache.pulsar.functions.runtime.Runtime;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.eclipse.jetty.util.ConcurrentHashSet;
@@ -40,6 +41,11 @@ public class FunctionsStatsGenerator {
     public static void generate(WorkerService workerService, String cluster, 
SimpleTextOutputStream out) {
         // only when worker service is initialized, we generate the stats. 
otherwise we will get bunch of NPE.
         if (workerService != null && workerService.isInitialized()) {
+            // kubernetes runtime factory doesn't support stats collection 
through worker service
+            if (workerService.getFunctionRuntimeManager().getRuntimeFactory() 
instanceof KubernetesRuntimeFactory) {
+                return;
+            }
+
             Map<String, FunctionRuntimeInfo> functionRuntimes
                     = 
workerService.getFunctionRuntimeManager().getFunctionRuntimeInfos();
 
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
index 849d05d..68a13b4 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
@@ -25,6 +25,7 @@ import lombok.ToString;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
 import org.apache.pulsar.functions.runtime.Runtime;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.testng.Assert;
@@ -60,6 +61,20 @@ public class FunctionStatsGeneratorTest {
     }
 
     @Test
+    public void testGenerateFunctionStatsOnK8SRuntimeFactory() {
+        WorkerService workerService = mock(WorkerService.class);
+        when(workerService.isInitialized()).thenReturn(true);
+        FunctionRuntimeManager frm = mock(FunctionRuntimeManager.class);
+        
when(frm.getRuntimeFactory()).thenReturn(mock(KubernetesRuntimeFactory.class));
+        when(workerService.getFunctionRuntimeManager()).thenReturn(frm);
+        FunctionsStatsGenerator.generate(
+            workerService, "test-cluster", new 
SimpleTextOutputStream(Unpooled.buffer()));
+        verify(workerService, times(1)).isInitialized();
+        verify(workerService, times(1)).getFunctionRuntimeManager();
+        verify(frm, times(0)).getFunctionRuntimeInfos();
+    }
+
+    @Test
     public void testFunctionsStatsGenerate() {
         FunctionRuntimeManager functionRuntimeManager = 
mock(FunctionRuntimeManager.class);
         Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new 
HashMap<>();

Reply via email to