This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7c51c7200c2b86e9e5d58616ddaef6eda8e99b16
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Mar 2 15:00:42 2022 +0200

    [Functions] Pass configured metricsPort to k8s runtime (#14502)
    
    (cherry picked from commit daed6a0b3ac94e77a3d7d4212ee297b9046317a2)
---
 conf/functions_worker.yml                          |  1 +
 .../runtime/kubernetes/KubernetesRuntime.java      | 18 +++++---
 .../kubernetes/KubernetesRuntimeFactory.java       |  7 ++-
 .../runtime/kubernetes/KubernetesRuntimeTest.java  | 52 ++++++++++++++++++++++
 .../functions/worker/FunctionsStatsGenerator.java  |  5 ++-
 5 files changed, 76 insertions(+), 7 deletions(-)

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 428c089..24d46e1 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -222,6 +222,7 @@ functionRuntimeFactoryConfigs:
 #    # The port inside the function pod which is used by the worker to 
communicate with the pod
 #    grpcPort: 9093
 #    # The port inside the function pod on which prometheus metrics are exposed
+#    # An empty value disables prometheus metrics.
 #    metricsPort: 9094
 #    # The directory inside the function pod where nar packages will be 
extracted
 #    narExtractionDirectory:
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index a04ce72..59d688a 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -426,7 +426,11 @@ public class KubernetesRuntime implements Runtime {
 
     @Override
     public String getPrometheusMetrics() throws IOException {
-        return RuntimeUtils.getPrometheusMetrics(metricsPort);
+        if (metricsPort != null) {
+            return RuntimeUtils.getPrometheusMetrics(metricsPort);
+        } else {
+            return null;
+        }
     }
 
     @Override
@@ -974,10 +978,14 @@ public class KubernetesRuntime implements Runtime {
     }
 
     private Map<String, String> getPrometheusAnnotations() {
-        final Map<String, String> annotations = new HashMap<>();
-        annotations.put("prometheus.io/scrape", "true");
-        annotations.put("prometheus.io/port", String.valueOf(metricsPort));
-        return annotations;
+        if (metricsPort != null) {
+            final Map<String, String> annotations = new HashMap<>();
+            annotations.put("prometheus.io/scrape", "true");
+            annotations.put("prometheus.io/port", String.valueOf(metricsPort));
+            return annotations;
+        } else {
+            return Collections.emptyMap();
+        }
     }
 
     private Map<String, String> getLabels(Function.FunctionDetails 
functionDetails) {
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
index d98a161..4b2c7e7 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
@@ -290,6 +290,11 @@ public class KubernetesRuntimeFactory implements 
RuntimeFactory {
         String overriddenNamespace = manifestCustomizer.map((customizer) -> 
customizer.customizeNamespace(instanceConfig.getFunctionDetails(), 
jobNamespace)).orElse(jobNamespace);
         String overriddenName = manifestCustomizer.map((customizer) -> 
customizer.customizeName(instanceConfig.getFunctionDetails(), 
jobName)).orElse(jobName);
 
+        // pass metricsPort configured in 
functionRuntimeFactoryConfigs.metricsPort in functions_worker.yml
+        if (metricsPort != null) {
+            instanceConfig.setMetricsPort(metricsPort);
+        }
+
         return new KubernetesRuntime(
             appsClient,
             coreClient,
@@ -351,7 +356,7 @@ public class KubernetesRuntimeFactory implements 
RuntimeFactory {
             if (k8Uri == null) {
                 log.info("k8Uri is null thus going by defaults");
                 ApiClient cli;
-                if (submittingInsidePod) {
+                if (submittingInsidePod != null && submittingInsidePod) {
                     log.info("Looks like we are inside a k8 pod ourselves. 
Initializing as cluster");
                     cli = Config.fromCluster();
                 } else {
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index ef45d7f..995546c 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -29,12 +29,14 @@ import io.kubernetes.client.openapi.apis.CoreV1Api;
 import io.kubernetes.client.custom.Quantity;
 import io.kubernetes.client.openapi.models.V1Container;
 import io.kubernetes.client.openapi.models.V1PodSpec;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
 import io.kubernetes.client.openapi.models.V1ResourceRequirements;
 import io.kubernetes.client.openapi.models.V1Service;
 import io.kubernetes.client.openapi.models.V1StatefulSet;
 import io.kubernetes.client.openapi.models.V1Toleration;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
@@ -42,6 +44,7 @@ import 
org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntime;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
+import 
org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.ConnectorsManager;
@@ -66,6 +69,7 @@ import static 
org.apache.pulsar.functions.utils.FunctionCommon.roundDecimal;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
@@ -1145,4 +1149,52 @@ public class KubernetesRuntimeTest {
         String containerCommand = 
spec.getSpec().getTemplate().getSpec().getContainers().get(0).getCommand().get(2);
         assertTrue(containerCommand.contains(expectedDownloadCommand));
     }
+
+    @Test
+    public void shouldUseConfiguredMetricsPort() throws Exception {
+        assertMetricsPortConfigured(Collections.singletonMap("metricsPort", 
12345), 12345);
+    }
+
+    @Test
+    public void shouldUseDefaultMetricsPortWhenMetricsPortIsntSet() throws 
Exception {
+        assertMetricsPortConfigured(Collections.emptyMap(), 9094);
+    }
+
+    @Test
+    public void shouldNotAddPrometheusAnnotationIfMetricsPortIsSetToEmpty() 
throws Exception {
+        assertMetricsPortConfigured(Collections.singletonMap("metricsPort", 
""), -1);
+    }
+
+    private void assertMetricsPortConfigured(Map<String, Object> 
functionRuntimeFactoryConfigs,
+                                             int expectedPort) throws 
Exception {
+        KubernetesRuntimeFactory kubernetesRuntimeFactory = new 
KubernetesRuntimeFactory();
+        WorkerConfig workerConfig = new WorkerConfig();
+        
workerConfig.setFunctionRuntimeFactoryClassName(KubernetesRuntimeFactory.class.getName());
+        
workerConfig.setFunctionRuntimeFactoryConfigs(functionRuntimeFactoryConfigs);
+        AuthenticationConfig authenticationConfig = 
AuthenticationConfig.builder().build();
+        kubernetesRuntimeFactory.initialize(workerConfig, 
authenticationConfig, new DefaultSecretsProviderConfigurator(), 
Mockito.mock(ConnectorsManager.class), Optional.empty(), Optional.empty());
+        InstanceConfig config = 
createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, true);
+        KubernetesRuntime container = 
kubernetesRuntimeFactory.createContainer(config, userJarFile, userJarFile, 30l);
+        V1PodTemplateSpec template = 
container.createStatefulSet().getSpec().getTemplate();
+        Map<String, String> annotations =
+                template.getMetadata().getAnnotations();
+        if (expectedPort != -1) {
+            // metrics port should be passed to k8s annotation for prometheus 
scraping
+            assertEquals(annotations.get("prometheus.io/port"), 
String.valueOf(expectedPort));
+            // scraping annotation should exist
+            assertEquals(annotations.get("prometheus.io/scrape"), "true");
+
+            // metrics port should be passed to JavaInstanceStarter with 
--metrics_port argument
+            
assertTrue(container.getProcessArgs().stream().collect(Collectors.joining(" "))
+                    .contains("--metrics_port " + expectedPort));
+        } else {
+            // No prometheus annotations should exist
+            assertFalse(annotations.containsKey("prometheus.io/scrape"));
+            assertFalse(annotations.containsKey("prometheus.io/port"));
+            // metrics will be started on random port when the port isn't 
specified
+            // check that "--metrics_port 0" argument is passed
+            
assertTrue(container.getProcessArgs().stream().collect(Collectors.joining(" "))
+                    .contains("--metrics_port 0"));
+        }
+    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index 7dd1e74..d94f54c 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -68,7 +68,10 @@ public class FunctionsStatsGenerator {
                     if (functionRuntime != null) {
                         try {
 
-                            out.write(functionRuntime.getPrometheusMetrics());
+                            String prometheusMetrics = 
functionRuntime.getPrometheusMetrics();
+                            if (prometheusMetrics != null) {
+                                out.write(prometheusMetrics);
+                            }
 
                         } catch (IOException e) {
                             log.warn("Failed to collect metrics for function 
instance {}",

Reply via email to