This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new ab02f4f [functions][stats] don't generate function stats at worker service if runtime is k8s (#2724) ab02f4f is described below commit ab02f4fdb26d5dc19a4cda85ee2fe33f5a774ad3 Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Fri Oct 5 00:51:26 2018 -0700 [functions][stats] don't generate function stats at worker service if runtime is k8s (#2724) *Motivation* k8s runtime doesn't support generating function stats at worker service right now. so skip it for now until that feature is added. *Changes* skip function stats for k8s runtime --- .../pulsar/functions/worker/FunctionsStatsGenerator.java | 6 ++++++ .../functions/worker/FunctionStatsGeneratorTest.java | 15 +++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java index 3e219d6..80c1b77 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java @@ -19,6 +19,7 @@ package org.apache.pulsar.functions.worker; import org.apache.pulsar.functions.proto.InstanceCommunication; +import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory; import org.apache.pulsar.functions.runtime.Runtime; import org.apache.pulsar.functions.runtime.RuntimeSpawner; import org.eclipse.jetty.util.ConcurrentHashSet; @@ -40,6 +41,11 @@ public class FunctionsStatsGenerator { public static void generate(WorkerService workerService, String cluster, SimpleTextOutputStream out) { // only when worker service is initialized, we generate the stats. otherwise we will get bunch of NPE. if (workerService != null && workerService.isInitialized()) { + // kubernetes runtime factory doesn't support stats collection through worker service + if (workerService.getFunctionRuntimeManager().getRuntimeFactory() instanceof KubernetesRuntimeFactory) { + return; + } + Map<String, FunctionRuntimeInfo> functionRuntimes = workerService.getFunctionRuntimeManager().getFunctionRuntimeInfos(); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java index 849d05d..68a13b4 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java @@ -25,6 +25,7 @@ import lombok.ToString; import org.apache.pulsar.common.util.SimpleTextOutputStream; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; +import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory; import org.apache.pulsar.functions.runtime.Runtime; import org.apache.pulsar.functions.runtime.RuntimeSpawner; import org.testng.Assert; @@ -60,6 +61,20 @@ public class FunctionStatsGeneratorTest { } @Test + public void testGenerateFunctionStatsOnK8SRuntimeFactory() { + WorkerService workerService = mock(WorkerService.class); + when(workerService.isInitialized()).thenReturn(true); + FunctionRuntimeManager frm = mock(FunctionRuntimeManager.class); + when(frm.getRuntimeFactory()).thenReturn(mock(KubernetesRuntimeFactory.class)); + when(workerService.getFunctionRuntimeManager()).thenReturn(frm); + FunctionsStatsGenerator.generate( + workerService, "test-cluster", new SimpleTextOutputStream(Unpooled.buffer())); + verify(workerService, times(1)).isInitialized(); + verify(workerService, times(1)).getFunctionRuntimeManager(); + verify(frm, times(0)).getFunctionRuntimeInfos(); + } + + @Test public void testFunctionsStatsGenerate() { FunctionRuntimeManager functionRuntimeManager = mock(FunctionRuntimeManager.class); Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new HashMap<>();