This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7c51c7200c2b86e9e5d58616ddaef6eda8e99b16 Author: Lari Hotari <[email protected]> AuthorDate: Wed Mar 2 15:00:42 2022 +0200 [Functions] Pass configured metricsPort to k8s runtime (#14502) (cherry picked from commit daed6a0b3ac94e77a3d7d4212ee297b9046317a2) --- conf/functions_worker.yml | 1 + .../runtime/kubernetes/KubernetesRuntime.java | 18 +++++--- .../kubernetes/KubernetesRuntimeFactory.java | 7 ++- .../runtime/kubernetes/KubernetesRuntimeTest.java | 52 ++++++++++++++++++++++ .../functions/worker/FunctionsStatsGenerator.java | 5 ++- 5 files changed, 76 insertions(+), 7 deletions(-) diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index 428c089..24d46e1 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -222,6 +222,7 @@ functionRuntimeFactoryConfigs: # # The port inside the function pod which is used by the worker to communicate with the pod # grpcPort: 9093 # # The port inside the function pod on which prometheus metrics are exposed +# # An empty value disables prometheus metrics. # metricsPort: 9094 # # The directory inside the function pod where nar packages will be extracted # narExtractionDirectory: diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java index a04ce72..59d688a 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java @@ -426,7 +426,11 @@ public class KubernetesRuntime implements Runtime { @Override public String getPrometheusMetrics() throws IOException { - return RuntimeUtils.getPrometheusMetrics(metricsPort); + if (metricsPort != null) { + return RuntimeUtils.getPrometheusMetrics(metricsPort); + } else { + return null; + } } @Override @@ -974,10 +978,14 @@ public class KubernetesRuntime implements Runtime { } private Map<String, String> getPrometheusAnnotations() { - final Map<String, String> annotations = new HashMap<>(); - annotations.put("prometheus.io/scrape", "true"); - annotations.put("prometheus.io/port", String.valueOf(metricsPort)); - return annotations; + if (metricsPort != null) { + final Map<String, String> annotations = new HashMap<>(); + annotations.put("prometheus.io/scrape", "true"); + annotations.put("prometheus.io/port", String.valueOf(metricsPort)); + return annotations; + } else { + return Collections.emptyMap(); + } } private Map<String, String> getLabels(Function.FunctionDetails functionDetails) { diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java index d98a161..4b2c7e7 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java @@ -290,6 +290,11 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { String overriddenNamespace = manifestCustomizer.map((customizer) -> customizer.customizeNamespace(instanceConfig.getFunctionDetails(), jobNamespace)).orElse(jobNamespace); String overriddenName = manifestCustomizer.map((customizer) -> customizer.customizeName(instanceConfig.getFunctionDetails(), jobName)).orElse(jobName); + // pass metricsPort configured in functionRuntimeFactoryConfigs.metricsPort in functions_worker.yml + if (metricsPort != null) { + instanceConfig.setMetricsPort(metricsPort); + } + return new KubernetesRuntime( appsClient, coreClient, @@ -351,7 +356,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { if (k8Uri == null) { log.info("k8Uri is null thus going by defaults"); ApiClient cli; - if (submittingInsidePod) { + if (submittingInsidePod != null && submittingInsidePod) { log.info("Looks like we are inside a k8 pod ourselves. Initializing as cluster"); cli = Config.fromCluster(); } else { diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java index ef45d7f..995546c 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java @@ -29,12 +29,14 @@ import io.kubernetes.client.openapi.apis.CoreV1Api; import io.kubernetes.client.custom.Quantity; import io.kubernetes.client.openapi.models.V1Container; import io.kubernetes.client.openapi.models.V1PodSpec; +import io.kubernetes.client.openapi.models.V1PodTemplateSpec; import io.kubernetes.client.openapi.models.V1ResourceRequirements; import io.kubernetes.client.openapi.models.V1Service; import io.kubernetes.client.openapi.models.V1StatefulSet; import io.kubernetes.client.openapi.models.V1Toleration; import org.apache.commons.lang.StringUtils; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.ConsumerSpec; @@ -42,6 +44,7 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.runtime.RuntimeCustomizer; import org.apache.pulsar.functions.runtime.thread.ThreadRuntime; import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider; +import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; import org.apache.pulsar.functions.utils.FunctionCommon; import org.apache.pulsar.functions.worker.ConnectorsManager; @@ -66,6 +69,7 @@ import static org.apache.pulsar.functions.utils.FunctionCommon.roundDecimal; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; @@ -1145,4 +1149,52 @@ public class KubernetesRuntimeTest { String containerCommand = spec.getSpec().getTemplate().getSpec().getContainers().get(0).getCommand().get(2); assertTrue(containerCommand.contains(expectedDownloadCommand)); } + + @Test + public void shouldUseConfiguredMetricsPort() throws Exception { + assertMetricsPortConfigured(Collections.singletonMap("metricsPort", 12345), 12345); + } + + @Test + public void shouldUseDefaultMetricsPortWhenMetricsPortIsntSet() throws Exception { + assertMetricsPortConfigured(Collections.emptyMap(), 9094); + } + + @Test + public void shouldNotAddPrometheusAnnotationIfMetricsPortIsSetToEmpty() throws Exception { + assertMetricsPortConfigured(Collections.singletonMap("metricsPort", ""), -1); + } + + private void assertMetricsPortConfigured(Map<String, Object> functionRuntimeFactoryConfigs, + int expectedPort) throws Exception { + KubernetesRuntimeFactory kubernetesRuntimeFactory = new KubernetesRuntimeFactory(); + WorkerConfig workerConfig = new WorkerConfig(); + workerConfig.setFunctionRuntimeFactoryClassName(KubernetesRuntimeFactory.class.getName()); + workerConfig.setFunctionRuntimeFactoryConfigs(functionRuntimeFactoryConfigs); + AuthenticationConfig authenticationConfig = AuthenticationConfig.builder().build(); + kubernetesRuntimeFactory.initialize(workerConfig, authenticationConfig, new DefaultSecretsProviderConfigurator(), Mockito.mock(ConnectorsManager.class), Optional.empty(), Optional.empty()); + InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, true); + KubernetesRuntime container = kubernetesRuntimeFactory.createContainer(config, userJarFile, userJarFile, 30l); + V1PodTemplateSpec template = container.createStatefulSet().getSpec().getTemplate(); + Map<String, String> annotations = + template.getMetadata().getAnnotations(); + if (expectedPort != -1) { + // metrics port should be passed to k8s annotation for prometheus scraping + assertEquals(annotations.get("prometheus.io/port"), String.valueOf(expectedPort)); + // scraping annotation should exist + assertEquals(annotations.get("prometheus.io/scrape"), "true"); + + // metrics port should be passed to JavaInstanceStarter with --metrics_port argument + assertTrue(container.getProcessArgs().stream().collect(Collectors.joining(" ")) + .contains("--metrics_port " + expectedPort)); + } else { + // No prometheus annotations should exist + assertFalse(annotations.containsKey("prometheus.io/scrape")); + assertFalse(annotations.containsKey("prometheus.io/port")); + // metrics will be started on random port when the port isn't specified + // check that "--metrics_port 0" argument is passed + assertTrue(container.getProcessArgs().stream().collect(Collectors.joining(" ")) + .contains("--metrics_port 0")); + } + } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java index 7dd1e74..d94f54c 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java @@ -68,7 +68,10 @@ public class FunctionsStatsGenerator { if (functionRuntime != null) { try { - out.write(functionRuntime.getPrometheusMetrics()); + String prometheusMetrics = functionRuntime.getPrometheusMetrics(); + if (prometheusMetrics != null) { + out.write(prometheusMetrics); + } } catch (IOException e) { log.warn("Failed to collect metrics for function instance {}",
