This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 61d013d [SPARK-48984] Add Controller Metrics System and Utils
61d013d is described below
commit 61d013d7cc2510c8169fccca30cb7103b75783ef
Author: zhou-jiang <[email protected]>
AuthorDate: Wed Jul 24 22:46:23 2024 -0700
[SPARK-48984] Add Controller Metrics System and Utils
### What changes were proposed in this pull request?
This is a breakdown PR of #12 - defines metrics system and utils classes
to be used by the reconcilers.
It also refactors a few methods in previous utils class
`org.apache.spark.k8s.operator.reconciler.SparkReconcilerUtils` into common
utils package.
### Why are the changes needed?
Breakdown PRs help us to move with more flexibility.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
CIs
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #23 from jiangzho/controller_utils.
Authored-by: zhou-jiang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
spark-operator/build.gradle | 20 +
.../operator/client/KubernetesClientFactory.java | 51 +++
.../k8s/operator/config/SparkOperatorConf.java | 467 +++++++++++++++++++++
.../k8s/operator/listeners/BaseStatusListener.java | 28 ++
.../operator/listeners/SparkAppStatusListener.java | 26 ++
.../spark/k8s/operator/metrics/JVMMetricSet.java | 74 ++++
.../spark/k8s/operator/metrics/MetricsService.java | 57 +++
.../spark/k8s/operator/metrics/MetricsSystem.java | 146 +++++++
.../k8s/operator/metrics/MetricsSystemFactory.java | 105 +++++
.../metrics/PrometheusPullModelHandler.java | 78 ++++
.../source/KubernetesMetricsInterceptor.java | 172 ++++++++
.../metrics/source/OperatorJosdkMetrics.java | 312 ++++++++++++++
.../operator/metrics/source/OperatorJvmSource.java | 39 ++
.../k8s/operator/utils/ClassLoadingUtils.java | 68 +++
.../spark/k8s/operator/utils/LoggingUtils.java | 64 +++
.../apache/spark/k8s/operator/utils/PodPhase.java | 58 +++
.../apache/spark/k8s/operator/utils/PodUtils.java | 98 +++++
.../apache/spark/k8s/operator/utils/ProbeUtil.java | 61 +++
.../k8s/operator/utils/SparkAppStatusUtils.java | 73 ++++
.../k8s/operator/utils/SparkExceptionUtils.java | 38 ++
.../org/apache/spark/k8s/operator/utils/Utils.java | 113 +++++
spark-operator/src/main/resources/EcsLayout.json | 49 +++
.../operator/metrics/MetricsSystemFactoryTest.java | 52 +++
.../k8s/operator/metrics/MetricsSystemTest.java | 77 ++++
.../spark/k8s/operator/metrics/sink/MockSink.java | 69 +++
.../source/KubernetesMetricsInterceptorTest.java | 153 +++++++
.../metrics/source/OperatorJosdkMetricsTest.java | 204 +++++++++
.../apache/spark/k8s/operator/utils/TestUtils.java | 63 +++
.../src/test/resources/log4j2.properties | 52 +++
29 files changed, 2867 insertions(+)
diff --git a/spark-operator/build.gradle b/spark-operator/build.gradle
index 8996708..6a733c6 100644
--- a/spark-operator/build.gradle
+++ b/spark-operator/build.gradle
@@ -21,6 +21,15 @@ dependencies {
implementation("org.apache.logging.log4j:log4j-1.2-api:$log4jVersion")
implementation("org.apache.logging.log4j:log4j-layout-template-json:$log4jVersion")
+ // metrics
+
implementation("io.dropwizard.metrics:metrics-core:$dropwizardMetricsVersion")
+ implementation("io.dropwizard.metrics:metrics-jvm:$dropwizardMetricsVersion")
+ compileOnly("org.apache.spark:spark-core_$scalaVersion:$sparkVersion") {
+ exclude group: 'com.squareup.okio'
+ exclude group: 'com.squareup.okhttp3'
+ exclude group: "org.apache.logging.log4j"
+ exclude group: "org.slf4j"
+ }
compileOnly("org.projectlombok:lombok:$lombokVersion")
annotationProcessor("org.projectlombok:lombok:$lombokVersion")
@@ -30,6 +39,17 @@ dependencies {
exclude group: 'com.squareup.okio'
exclude group: 'io.fabric8'
}
+ testImplementation("io.fabric8:kubernetes-server-mock:$fabric8Version") {
+ exclude group: 'junit'
+ exclude group: 'com.squareup.okhttp3'
+ }
+
testImplementation("org.apache.spark:spark-core_$scalaVersion:$sparkVersion") {
+ exclude group: 'com.squareup.okio'
+ exclude group: 'com.squareup.okhttp3'
+ exclude group: "org.apache.logging.log4j"
+ exclude group: "org.slf4j"
+ }
+ testImplementation("com.squareup.okhttp3:mockwebserver:$okHttpVersion")
testImplementation platform("org.junit:junit-bom:$junitVersion")
testImplementation("org.junit.jupiter:junit-jupiter:$junitVersion")
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/client/KubernetesClientFactory.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/client/KubernetesClientFactory.java
new file mode 100644
index 0000000..6d3e213
--- /dev/null
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/client/KubernetesClientFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.spark.k8s.operator.client;
+
+import java.util.List;
+
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientBuilder;
+import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory;
+import okhttp3.Interceptor;
+import okhttp3.OkHttpClient;
+
+/** Build Kubernetes Client with metrics configured */
+public class KubernetesClientFactory {
+ public static KubernetesClient buildKubernetesClient(final List<Interceptor>
interceptors) {
+ return buildKubernetesClient(interceptors, null);
+ }
+
+ public static KubernetesClient buildKubernetesClient(
+ final List<Interceptor> interceptors, final Config
kubernetesClientConfig) {
+ return new KubernetesClientBuilder()
+ .withConfig(kubernetesClientConfig)
+ .withHttpClientFactory(
+ new OkHttpClientFactory() {
+ @Override
+ protected void additionalConfig(OkHttpClient.Builder builder) {
+ for (Interceptor interceptor : interceptors) {
+ builder.addInterceptor(interceptor);
+ }
+ }
+ })
+ .build();
+ }
+}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
new file mode 100644
index 0000000..a3f791a
--- /dev/null
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.config;
+
+import java.time.Duration;
+
+import io.javaoperatorsdk.operator.api.config.LeaderElectionConfiguration;
+import io.javaoperatorsdk.operator.processing.event.rate.LinearRateLimiter;
+import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
+import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.spark.k8s.operator.utils.Utils;
+
+/** Spark Operator Configuration options. */
+@Slf4j
+public class SparkOperatorConf {
+ public static final ConfigOption<String> OPERATOR_APP_NAME =
+ ConfigOption.<String>builder()
+ .key("spark.kubernetes.operator.name")
+ .enableDynamicOverride(false)
+ .description("Name of the operator.")
+ .typeParameterClass(String.class)
+ .defaultValue("spark-kubernetes-operator")
+ .build();
+
+ public static final ConfigOption<String> OPERATOR_NAMESPACE =
+ ConfigOption.<String>builder()
+ .key("spark.kubernetes.operator.namespace")
+ .enableDynamicOverride(false)
+ .description("Namespace that operator is deployed within.")
+ .typeParameterClass(String.class)
+ .defaultValue("default")
+ .build();
+
+ public static final ConfigOption<String> OPERATOR_WATCHED_NAMESPACES =
+ ConfigOption.<String>builder()
+ .key("spark.kubernetes.operator.watchedNamespaces")
+ .enableDynamicOverride(true)
+ .description(
+ "Comma-separated list of namespaces that the operator would be
watching for "
+ + "Spark resources. If set to '*', operator would watch all
namespaces.")
+ .typeParameterClass(String.class)
+ .defaultValue("default")
+ .build();
+
+ public static final ConfigOption<Boolean>
TERMINATE_ON_INFORMER_FAILURE_ENABLED =
+ ConfigOption.<Boolean>builder()
+ .key("spark.kubernetes.operator.terminateOnInformerFailureEnabled")
+ .enableDynamicOverride(false)
+ .description(
+ "Enable to indicate informer errors should stop operator
startup. If "
+ + "disabled, operator startup will ignore recoverable
errors, "
+ + "caused for example by RBAC issues and will retry "
+ + "periodically.")
+ .typeParameterClass(Boolean.class)
+ .defaultValue(false)
+ .build();
+
+ public static final ConfigOption<Integer>
RECONCILER_TERMINATION_TIMEOUT_SECONDS =
+ ConfigOption.<Integer>builder()
+
.key("spark.kubernetes.operator.reconciler.terminationTimeoutSeconds")
+ .enableDynamicOverride(false)
+ .description(
+ "Grace period for operator shutdown before reconciliation
threads are killed.")
+ .typeParameterClass(Integer.class)
+ .defaultValue(30)
+ .build();
+
+ public static final ConfigOption<Integer> RECONCILER_PARALLELISM =
+ ConfigOption.<Integer>builder()
+ .key("spark.kubernetes.operator.reconciler.parallelism")
+ .enableDynamicOverride(false)
+ .description(
+ "Thread pool size for Spark Operator reconcilers. Unbounded pool
would be used if "
+ + "set to non-positive number.")
+ .typeParameterClass(Integer.class)
+ .defaultValue(50)
+ .build();
+
+ public static final ConfigOption<Long>
RECONCILER_FOREGROUND_REQUEST_TIMEOUT_SECONDS =
+ ConfigOption.<Long>builder()
+
.key("spark.kubernetes.operator.reconciler.foregroundRequestTimeoutSeconds")
+ .enableDynamicOverride(true)
+ .description(
+ "Timeout (in seconds) to for requests made to API server. This "
+ + "applies only to foreground requests.")
+ .typeParameterClass(Long.class)
+ .defaultValue(30L)
+ .build();
+
+ public static final ConfigOption<Long> RECONCILER_INTERVAL_SECONDS =
+ ConfigOption.<Long>builder()
+ .key("spark.kubernetes.operator.reconciler.intervalSeconds")
+ .enableDynamicOverride(true)
+ .description(
+ "Interval (in seconds, non-negative) to reconcile Spark
applications. Note that "
+ + "reconciliation is always expected to be triggered when
app spec / status is "
+ + "updated. This interval controls the reconcile behavior of
operator "
+ + "reconciliation even when there's no update on
SparkApplication, e.g. to "
+ + "determine whether a hanging app needs to be proactively
terminated. Thus "
+ + "this is recommended to set to above 2 minutes to avoid
unnecessary no-op "
+ + "reconciliation.")
+ .typeParameterClass(Long.class)
+ .defaultValue(120L)
+ .build();
+
+ public static final ConfigOption<Boolean>
TRIM_ATTEMPT_STATE_TRANSITION_HISTORY =
+ ConfigOption.<Boolean>builder()
+
.key("spark.kubernetes.operator.reconciler.trimStateTransitionHistoryEnabled")
+ .enableDynamicOverride(true)
+ .description(
+ "When enabled, operator would trim state transition history when
a "
+ + "new attempt starts, keeping previous attempt summary
only.")
+ .typeParameterClass(Boolean.class)
+ .defaultValue(true)
+ .build();
+
+ public static final ConfigOption<String>
SPARK_APP_STATUS_LISTENER_CLASS_NAMES =
+ ConfigOption.<String>builder()
+
.key("spark.kubernetes.operator.reconciler.appStatusListenerClassNames")
+ .enableDynamicOverride(false)
+ .description("Comma-separated names of SparkAppStatusListener class
implementations")
+ .typeParameterClass(String.class)
+ .defaultValue("")
+ .build();
+
+ public static final ConfigOption<Boolean> DYNAMIC_CONFIG_ENABLED =
+ ConfigOption.<Boolean>builder()
+ .key("spark.kubernetes.operator.dynamicConfig.enabled")
+ .enableDynamicOverride(false)
+ .description(
+ "When enabled, operator would use config map as source of truth
for config "
+ + "property override. The config map need to be created in "
+ + "spark.kubernetes.operator.namespace, and labeled with
operator name.")
+ .typeParameterClass(Boolean.class)
+ .defaultValue(false)
+ .build();
+
+ public static final ConfigOption<String> DYNAMIC_CONFIG_SELECTOR =
+ ConfigOption.<String>builder()
+ .key("spark.kubernetes.operator.dynamicConfig.selector")
+ .enableDynamicOverride(false)
+ .description("The selector str applied to dynamic config map.")
+ .typeParameterClass(String.class)
+ .defaultValue(Utils.labelsAsStr(Utils.defaultOperatorConfigLabels()))
+ .build();
+
+ public static final ConfigOption<Integer>
DYNAMIC_CONFIG_RECONCILER_PARALLELISM =
+ ConfigOption.<Integer>builder()
+ .key("spark.kubernetes.operator.dynamicConfig.reconcilerParallelism")
+ .enableDynamicOverride(false)
+ .description(
+ "Parallelism for dynamic config reconciler. Unbounded pool would
be used "
+ + "if set to non-positive number.")
+ .typeParameterClass(Integer.class)
+ .defaultValue(1)
+ .build();
+
+ public static final ConfigOption<Integer>
RECONCILER_RATE_LIMITER_REFRESH_PERIOD_SECONDS =
+ ConfigOption.<Integer>builder()
+
.key("spark.kubernetes.operator.reconciler.rateLimiter.refreshPeriodSeconds")
+ .enableDynamicOverride(false)
+ .description("Operator rate limiter refresh period(in seconds) for
each resource.")
+ .typeParameterClass(Integer.class)
+ .defaultValue(15)
+ .build();
+
+ public static final ConfigOption<Integer>
RECONCILER_RATE_LIMITER_MAX_LOOP_FOR_PERIOD =
+ ConfigOption.<Integer>builder()
+
.key("spark.kubernetes.operator.reconciler.rateLimiter.maxLoopForPeriod")
+ .enableDynamicOverride(false)
+ .description(
+ "Max number of reconcile loops triggered within the rate limiter
refresh "
+ + "period for each resource. Setting the limit <= 0 disables
the "
+ + "limiter.")
+ .typeParameterClass(Integer.class)
+ .defaultValue(5)
+ .build();
+
+ public static final ConfigOption<Integer>
RECONCILER_RETRY_INITIAL_INTERVAL_SECONDS =
+ ConfigOption.<Integer>builder()
+
.key("spark.kubernetes.operator.reconciler.retry.initialIntervalSeconds")
+ .enableDynamicOverride(false)
+ .description("Initial interval(in seconds) of retries on unhandled
controller errors.")
+ .typeParameterClass(Integer.class)
+ .defaultValue(5)
+ .build();
+
+ public static final ConfigOption<Double>
RECONCILER_RETRY_INTERVAL_MULTIPLIER =
+ ConfigOption.<Double>builder()
+ .key("spark.kubernetes.operator.reconciler.retry.intervalMultiplier")
+ .enableDynamicOverride(false)
+ .description(
+ "Interval multiplier of retries on unhandled controller errors.
Setting "
+ + "this to 1 for linear retry.")
+ .typeParameterClass(Double.class)
+ .defaultValue(1.5)
+ .build();
+
+ public static final ConfigOption<Integer>
RECONCILER_RETRY_MAX_INTERVAL_SECONDS =
+ ConfigOption.<Integer>builder()
+ .key("spark.kubernetes.operator.reconciler.retry.maxIntervalSeconds")
+ .enableDynamicOverride(false)
+ .description(
+ "Max interval(in seconds) of retries on unhandled controller
errors. "
+ + "Set to non-positive for unlimited.")
+ .typeParameterClass(Integer.class)
+ .defaultValue(-1)
+ .build();
+
+ public static final ConfigOption<Integer> API_RETRY_MAX_ATTEMPTS =
+ ConfigOption.<Integer>builder()
+ .key("spark.kubernetes.operator.api.retryMaxAttempts")
+ .enableDynamicOverride(false)
+ .description(
+ "Max attempts of retries on unhandled controller errors. Setting
this to "
+ + "non-positive value means no retry.")
+ .typeParameterClass(Integer.class)
+ .defaultValue(15)
+ .build();
+
+ public static final ConfigOption<Long> API_RETRY_ATTEMPT_AFTER_SECONDS =
+ ConfigOption.<Long>builder()
+ .key("spark.kubernetes.operator.api.retryAttemptAfterSeconds")
+ .enableDynamicOverride(false)
+ .description(
+ "Default time (in seconds) to wait till next request. This would
be used if "
+ + "server does not set Retry-After in response. Setting this
to non-positive "
+ + "number means immediate retry.")
+ .typeParameterClass(Long.class)
+ .defaultValue(1L)
+ .build();
+
+ public static final ConfigOption<Long> API_STATUS_PATCH_MAX_ATTEMPTS =
+ ConfigOption.<Long>builder()
+ .key("spark.kubernetes.operator.api.statusPatchMaxAttempts")
+ .enableDynamicOverride(false)
+ .description(
+ "Maximal number of retry attempts of requests to k8s server for
resource "
+ + "status update. This would be performed on top of k8s
client "
+ + "spark.kubernetes.operator.retry.maxAttempts to overcome
potential "
+ + "conflicting update on the same SparkApplication. This
should be positive "
+ + "number.")
+ .typeParameterClass(Long.class)
+ .defaultValue(3L)
+ .build();
+
+ public static final ConfigOption<Long>
API_SECONDARY_RESOURCE_CREATE_MAX_ATTEMPTS =
+ ConfigOption.<Long>builder()
+
.key("spark.kubernetes.operator.api.secondaryResourceCreateMaxAttempts")
+ .enableDynamicOverride(false)
+ .description(
+ "Maximal number of retry attempts of requesting secondary
resource for Spark "
+ + "application. This would be performed on top of k8s client
"
+ + "spark.kubernetes.operator.retry.maxAttempts to overcome
potential "
+ + "conflicting reconcile on the same SparkApplication. This
should be "
+ + "positive number")
+ .typeParameterClass(Long.class)
+ .defaultValue(3L)
+ .build();
+
+ public static final ConfigOption<Boolean> JOSDK_METRICS_ENABLED =
+ ConfigOption.<Boolean>builder()
+ .key("spark.kubernetes.operator.metrics.josdkMetricsEnabled")
+ .enableDynamicOverride(false)
+ .description(
+ "When enabled, the josdk metrics will be added in metrics source
and "
+ + "configured for operator.")
+ .typeParameterClass(Boolean.class)
+ .defaultValue(true)
+ .build();
+
+ public static final ConfigOption<Boolean> KUBERNETES_CLIENT_METRICS_ENABLED =
+ ConfigOption.<Boolean>builder()
+ .key("spark.kubernetes.operator.metrics.clientMetricsEnabled")
+ .enableDynamicOverride(false)
+ .description(
+ "Enable KubernetesClient metrics for measuring the HTTP traffic
to "
+ + "the Kubernetes API Server. Since the metrics is collected
"
+ + "via Okhttp interceptors, can be disabled when opt in "
+ + "customized interceptors.")
+ .typeParameterClass(Boolean.class)
+ .defaultValue(true)
+ .build();
+
+ public static final ConfigOption<Boolean>
+ KUBERNETES_CLIENT_METRICS_GROUP_BY_RESPONSE_CODE_GROUP_ENABLED =
+ ConfigOption.<Boolean>builder()
+
.key("spark.kubernetes.operator.metrics.clientMetricsGroupByResponseCodeEnabled")
+ .enableDynamicOverride(false)
+ .description(
+ "When enabled, additional metrics group by http response
code group(1xx, "
+ + "2xx, 3xx, 4xx, 5xx) received from API server will be
added. Users "
+ + "can disable it when their monitoring system can
combine lower level "
+ +
"kubernetes.client.http.response.<3-digit-response-code> metrics.")
+ .typeParameterClass(Boolean.class)
+ .defaultValue(true)
+ .build();
+
+ public static final ConfigOption<Integer> OPERATOR_METRICS_PORT =
+ ConfigOption.<Integer>builder()
+ .key("spark.kubernetes.operator.metrics.port")
+ .enableDynamicOverride(false)
+ .description("The port used for checking metrics")
+ .typeParameterClass(Integer.class)
+ .defaultValue(19090)
+ .build();
+
+ public static final ConfigOption<Integer> OPERATOR_PROBE_PORT =
+ ConfigOption.<Integer>builder()
+ .key("spark.kubernetes.operator.health.probePort")
+ .enableDynamicOverride(false)
+ .description("The port used for health/readiness check probe
status.")
+ .typeParameterClass(Integer.class)
+ .defaultValue(19091)
+ .build();
+
+ public static final ConfigOption<Integer>
SENTINEL_EXECUTOR_SERVICE_POOL_SIZE =
+ ConfigOption.<Integer>builder()
+ .key("spark.kubernetes.operator.health.sentinelExecutorPoolSize")
+ .typeParameterClass(Integer.class)
+ .description(
+ "Size of executor service in Sentinel Managers to check the
health "
+ + "of sentinel resources.")
+ .defaultValue(3)
+ .enableDynamicOverride(false)
+ .build();
+
+ public static final ConfigOption<Integer>
SENTINEL_RESOURCE_RECONCILIATION_DELAY =
+ ConfigOption.<Integer>builder()
+
.key("spark.kubernetes.operator.health.sentinelResourceReconciliationDelaySeconds")
+ .enableDynamicOverride(true)
+ .description(
+ "Allowed max time(seconds) between spec update and
reconciliation "
+ + "for sentinel resources.")
+ .typeParameterClass(Integer.class)
+ .defaultValue(60)
+ .build();
+
+ public static final ConfigOption<Boolean> LEADER_ELECTION_ENABLED =
+ ConfigOption.<Boolean>builder()
+ .key("spark.kubernetes.operator.leaderElection.enabled")
+ .enableDynamicOverride(false)
+ .description(
+ "Enable leader election for the operator to allow running
standby instances. When "
+ + "this is disabled, only one operator instance is expected
to be up and "
+ + "running at any time (replica = 1) to avoid race
condition.")
+ .typeParameterClass(Boolean.class)
+ .defaultValue(false)
+ .build();
+
+ public static final ConfigOption<String> LEADER_ELECTION_LEASE_NAME =
+ ConfigOption.<String>builder()
+ .key("spark.kubernetes.operator.leaderElection.leaseName")
+ .enableDynamicOverride(false)
+ .description(
+ "Leader election lease name, must be unique for leases in the
same namespace.")
+ .typeParameterClass(String.class)
+ .defaultValue("spark-operator-lease")
+ .build();
+
+ public static final ConfigOption<Integer>
LEADER_ELECTION_LEASE_DURATION_SECONDS =
+ ConfigOption.<Integer>builder()
+ .key("spark.kubernetes.operator.leaderElection.leaseDurationSeconds")
+ .enableDynamicOverride(false)
+ .description("Leader election lease duration in seconds,
non-negative.")
+ .typeParameterClass(Integer.class)
+ .defaultValue(180)
+ .build();
+
+ public static final ConfigOption<Integer>
LEADER_ELECTION_RENEW_DEADLINE_SECONDS =
+ ConfigOption.<Integer>builder()
+ .key("spark.kubernetes.operator.leaderElection.renewDeadlineSeconds")
+ .enableDynamicOverride(false)
+ .description(
+ "Leader election renew deadline in seconds, non-negative. This
needs to be "
+ + "smaller than the lease duration to allow current leader
renew the lease "
+ + "before lease expires.")
+ .typeParameterClass(Integer.class)
+ .defaultValue(120)
+ .build();
+
+ public static final ConfigOption<Integer>
LEADER_ELECTION_RETRY_PERIOD_SECONDS =
+ ConfigOption.<Integer>builder()
+ .key("spark.kubernetes.operator.leaderElection.retryPeriodSeconds")
+ .enableDynamicOverride(false)
+ .description("Leader election retry period in seconds,
non-negative.")
+ .typeParameterClass(Integer.class)
+ .defaultValue(5)
+ .build();
+
+ public static LeaderElectionConfiguration getLeaderElectionConfig() {
+ return new LeaderElectionConfiguration(
+ LEADER_ELECTION_LEASE_NAME.getValue(),
+ OPERATOR_NAMESPACE.getValue(),
+
Duration.ofSeconds(ensurePositiveIntFor(LEADER_ELECTION_LEASE_DURATION_SECONDS)),
+
Duration.ofSeconds(ensurePositiveIntFor(LEADER_ELECTION_RENEW_DEADLINE_SECONDS)),
+
Duration.ofSeconds(ensurePositiveIntFor(LEADER_ELECTION_RETRY_PERIOD_SECONDS)));
+ }
+
+ public static GenericRetry getOperatorRetry() {
+ GenericRetry genericRetry =
+ new GenericRetry()
+ .setMaxAttempts(ensureNonNegativeIntFor(API_RETRY_MAX_ATTEMPTS))
+ .setInitialInterval(
+ Duration.ofSeconds(
+
ensureNonNegativeIntFor(RECONCILER_RETRY_INITIAL_INTERVAL_SECONDS))
+ .toMillis())
+
.setIntervalMultiplier(RECONCILER_RETRY_INTERVAL_MULTIPLIER.getValue());
+ if (RECONCILER_RETRY_MAX_INTERVAL_SECONDS.getValue() > 0) {
+ genericRetry.setMaxInterval(
+
Duration.ofSeconds(RECONCILER_RETRY_MAX_INTERVAL_SECONDS.getValue()).toMillis());
+ } else {
+ log.info("Reconciler retry policy is configured with unlimited max
attempts");
+ }
+ return genericRetry;
+ }
+
+ public static RateLimiter<?> getOperatorRateLimiter() {
+ return new LinearRateLimiter(
+
Duration.ofSeconds(ensureNonNegativeIntFor(RECONCILER_RATE_LIMITER_REFRESH_PERIOD_SECONDS)),
+ ensureNonNegativeIntFor(RECONCILER_RATE_LIMITER_MAX_LOOP_FOR_PERIOD));
+ }
+
+ private static int ensureNonNegativeIntFor(ConfigOption<Integer>
configOption) {
+ return ensureValid(configOption.getValue(), configOption.getDescription(),
0, 0);
+ }
+
+ private static int ensurePositiveIntFor(ConfigOption<Integer> configOption) {
+ return ensureValid(configOption.getValue(), configOption.getDescription(),
1, 1);
+ }
+
+ private static int ensureValid(int value, String description, int minValue,
int defaultValue) {
+ if (value < minValue) {
+ if (defaultValue < minValue) {
+ throw new IllegalArgumentException(
+ "Default value for " + description + " must be greater than " +
minValue);
+ }
+ log.warn(
+ "Requested {} should be greater than {}. Requested: {}, using {}
(default) instead",
+ description,
+ minValue,
+ value,
+ defaultValue);
+ return defaultValue;
+ }
+ return value;
+ }
+}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/listeners/BaseStatusListener.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/listeners/BaseStatusListener.java
new file mode 100644
index 0000000..17076b8
--- /dev/null
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/listeners/BaseStatusListener.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.spark.k8s.operator.listeners;
+
+import org.apache.spark.k8s.operator.BaseResource;
+import org.apache.spark.k8s.operator.status.BaseStatus;
+
+/** Custom listeners, if added, would be listening to resource status change */
+public abstract class BaseStatusListener<
+ STATUS extends BaseStatus<?, ?, ?>, CR extends BaseResource<?, ?, ?, ?,
STATUS>> {
+ public abstract void listenStatus(CR resource, STATUS prevStatus, STATUS
updatedStatus);
+}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/listeners/SparkAppStatusListener.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/listeners/SparkAppStatusListener.java
new file mode 100644
index 0000000..018e059
--- /dev/null
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/listeners/SparkAppStatusListener.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.spark.k8s.operator.listeners;
+
+import org.apache.spark.k8s.operator.SparkApplication;
+import org.apache.spark.k8s.operator.status.ApplicationStatus;
+
+/** Custom listeners, if added, would be listening to Spark App status change
*/
+public abstract class SparkAppStatusListener
+ extends BaseStatusListener<ApplicationStatus, SparkApplication> {}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/JVMMetricSet.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/JVMMetricSet.java
new file mode 100644
index 0000000..a87d5bd
--- /dev/null
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/JVMMetricSet.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics;
+
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
+import com.codahale.metrics.jvm.BufferPoolMetricSet;
+import com.codahale.metrics.jvm.FileDescriptorRatioGauge;
+import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
+
+public class JVMMetricSet implements MetricSet {
+
+ /** Refer codahale FileDescriptorRatioGauge for the definition */
+ public static final String FILE_DESC_RATIO_NAME = "fileDesc.ratio.open/max";
+
+ public static final String PREFIX_BUFFER_POOL = "bufferPool";
+ public static final String PREFIX_GC = "gc";
+ public static final String PREFIX_MEMORY_USAGE = "memoryUsage";
+ public static final String PREFIX_THREADS_STATES = "threadStates";
+ private final BufferPoolMetricSet bufferPoolMetricSet;
+ private final FileDescriptorRatioGauge fileDescriptorRatioGauge;
+ private final GarbageCollectorMetricSet garbageCollectorMetricSet;
+ private final MemoryUsageGaugeSet memoryUsageGaugeSet;
+ private final ThreadStatesGaugeSet threadStatesGaugeSet;
+
+ public JVMMetricSet() {
+ bufferPoolMetricSet = new
BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer());
+ fileDescriptorRatioGauge = new FileDescriptorRatioGauge();
+ garbageCollectorMetricSet = new GarbageCollectorMetricSet();
+ memoryUsageGaugeSet = new MemoryUsageGaugeSet();
+ threadStatesGaugeSet = new ThreadStatesGaugeSet();
+ }
+
+ @Override
+ public Map<String, Metric> getMetrics() {
+ final Map<String, Metric> jvmMetrics = new HashMap<>();
+ putAllMetrics(jvmMetrics, bufferPoolMetricSet, PREFIX_BUFFER_POOL);
+ jvmMetrics.put(FILE_DESC_RATIO_NAME, fileDescriptorRatioGauge);
+ putAllMetrics(jvmMetrics, garbageCollectorMetricSet, PREFIX_GC);
+ putAllMetrics(jvmMetrics, memoryUsageGaugeSet, PREFIX_MEMORY_USAGE);
+ putAllMetrics(jvmMetrics, threadStatesGaugeSet, PREFIX_THREADS_STATES);
+ return jvmMetrics;
+ }
+
+ private void putAllMetrics(
+ final Map<String, Metric> destination, final MetricSet origin, final
String prefix) {
+ for (Map.Entry<String, Metric> entry : origin.getMetrics().entrySet()) {
+ destination.put(prefix + "." + entry.getKey(), entry.getValue());
+ }
+ }
+}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/MetricsService.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/MetricsService.java
new file mode 100644
index 0000000..a4af354
--- /dev/null
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/MetricsService.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics;
+
+import static
org.apache.spark.k8s.operator.config.SparkOperatorConf.OPERATOR_METRICS_PORT;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executor;
+
+import com.sun.net.httpserver.HttpServer;
+import lombok.extern.slf4j.Slf4j;
+
+/** Start Http service at endpoint /prometheus, exposing operator metrics */
+@Slf4j
+public class MetricsService {
+ HttpServer server;
+ MetricsSystem metricsSystem;
+
+ public MetricsService(MetricsSystem metricsSystem, Executor executor) {
+ this.metricsSystem = metricsSystem;
+ try {
+ server = HttpServer.create(new
InetSocketAddress(OPERATOR_METRICS_PORT.getValue()), 0);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to create Metrics Server", e);
+ }
+ server.setExecutor(executor);
+ }
+
+ public void start() {
+ log.info("Starting Metrics Service for Prometheus ...");
+ server.createContext("/prometheus",
metricsSystem.getPrometheusPullModelHandler());
+ server.start();
+ }
+
+ public void stop() {
+ log.info("Metrics Service stopped");
+ server.stop(0);
+ }
+}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/MetricsSystem.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/MetricsSystem.java
new file mode 100644
index 0000000..5dcd61b
--- /dev/null
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/MetricsSystem.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.spark.k8s.operator.metrics.source.OperatorJvmSource;
+import org.apache.spark.metrics.sink.Sink;
+import org.apache.spark.metrics.source.Source;
+
+@Slf4j
+public class MetricsSystem {
+ private final AtomicBoolean running = new AtomicBoolean(false);
+ @Getter private final Set<Sink> sinks;
+ @Getter private final Set<Source> sources;
+ @Getter private final MetricRegistry registry;
+ @Getter private final Properties properties;
+ // PrometheusPullModelHandler is registered by default, metrics exposed via
http port
+ @Getter private final PrometheusPullModelHandler prometheusPullModelHandler;
+ private final Map<String, SinkProperties> sinkPropertiesMap;
+
+ public MetricsSystem() {
+ this(new Properties());
+ }
+
+ public MetricsSystem(Properties properties) {
+ this.sources = new HashSet<>();
+ this.sinks = new HashSet<>();
+ this.registry = new MetricRegistry();
+ this.properties = properties;
+ this.sinkPropertiesMap =
MetricsSystemFactory.parseSinkProperties(this.properties);
+ // Add default sinks
+ this.prometheusPullModelHandler = new PrometheusPullModelHandler(new
Properties(), registry);
+ this.sinks.add(prometheusPullModelHandler);
+ }
+
+ public void start() {
+ if (running.get()) {
+ throw new IllegalStateException(
+ "Attempting to start a MetricsSystem that is already running");
+ }
+ running.set(true);
+ registerDefaultSources();
+ registerSinks();
+ sinks.forEach(Sink::start);
+ }
+
+ public void stop() {
+ if (running.get()) {
+ sinks.forEach(Sink::stop);
+ registry.removeMatching(MetricFilter.ALL);
+ } else {
+ log.error("Stopping a MetricsSystem that is not running");
+ }
+ running.set(false);
+ }
+
+ public void report() {
+ sinks.forEach(Sink::report);
+ }
+
+ protected void registerDefaultSources() {
+ registerSource(new OperatorJvmSource());
+ }
+
+ protected void registerSinks() {
+ log.info("sinkPropertiesMap: {}", sinkPropertiesMap);
+ sinkPropertiesMap
+ .values()
+ .forEach(
+ sinkProp -> {
+ try {
+ Class<Sink> sinkClass = (Class<Sink>)
Class.forName(sinkProp.getClassName());
+ Sink sinkInstance;
+ sinkInstance =
+ sinkClass
+ .getConstructor(Properties.class, MetricRegistry.class)
+ .newInstance(sinkProp.getProperties(), registry);
+ sinks.add(sinkInstance);
+ } catch (InstantiationException
+ | IllegalAccessException
+ | IllegalArgumentException
+ | InvocationTargetException
+ | NoSuchMethodException
+ | SecurityException
+ | ClassNotFoundException e) {
+ if (log.isErrorEnabled()) {
+ log.error(
+ "Fail to create metrics sink for sink name {}, sink
properties {}",
+ sinkProp.getClassName(),
+ sinkProp.getProperties());
+ }
+ throw new RuntimeException("Fail to create metrics sink", e);
+ }
+ });
+ }
+
+ public void registerSource(Source source) {
+ sources.add(source);
+ try {
+ String regName = MetricRegistry.name(source.sourceName());
+ registry.register(regName, source.metricRegistry());
+ } catch (IllegalArgumentException e) {
+ log.error("Metrics already registered", e);
+ }
+ }
+
+ @Data
+ public static class SinkProperties {
+ String className;
+ Properties properties;
+
+ public SinkProperties() {
+ this.className = "";
+ this.properties = new Properties();
+ }
+ }
+}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/MetricsSystemFactory.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/MetricsSystemFactory.java
new file mode 100644
index 0000000..88d357c
--- /dev/null
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/MetricsSystemFactory.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics;
+
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.spark.k8s.operator.config.SparkOperatorConfManager;
+
+/** Factory for MetricsSystem */
+public class MetricsSystemFactory {
+ public static final String METRIC_PREFIX = "spark.metrics.conf.operator.";
+ public static final String SINK = "sink.";
+ public static final String CLASS = "class";
+
+ public static MetricsSystem createMetricsSystem() {
+ Properties properties =
+
parseMetricsProperties(SparkOperatorConfManager.INSTANCE.getMetricsProperties());
+ return new MetricsSystem(properties);
+ }
+
+ public static MetricsSystem createMetricsSystem(Properties properties) {
+ return new MetricsSystem(properties);
+ }
+
+ private static Properties parseMetricsProperties(Properties userProperties) {
+ Properties properties = new Properties();
+ Enumeration<?> valueEnumeration = userProperties.propertyNames();
+ while (valueEnumeration.hasMoreElements()) {
+ String key = (String) valueEnumeration.nextElement();
+ if (key.startsWith(METRIC_PREFIX)) {
+ properties.put(key.substring(METRIC_PREFIX.length()),
userProperties.getProperty(key));
+ }
+ }
+ return properties;
+ }
+
+ public static Map<String, MetricsSystem.SinkProperties> parseSinkProperties(
+ Properties metricsProperties) {
+ Map<String, MetricsSystem.SinkProperties> propertiesMap = new HashMap<>();
+ // e.g: "sink.graphite.class"="org.apache.spark.metrics.sink.ConsoleSink"
+ Enumeration<?> valueEnumeration = metricsProperties.propertyNames();
+ while (valueEnumeration.hasMoreElements()) {
+ String key = (String) valueEnumeration.nextElement();
+ int firstDotIndex = StringUtils.ordinalIndexOf(key, ".", 1);
+ int secondDotIndex = StringUtils.ordinalIndexOf(key, ".", 2);
+ if (key.startsWith(SINK)) {
+ String shortName = key.substring(firstDotIndex + 1, secondDotIndex);
+ MetricsSystem.SinkProperties sinkProperties =
+ propertiesMap.getOrDefault(shortName, new
MetricsSystem.SinkProperties());
+ if (key.endsWith(CLASS)) {
+ sinkProperties.setClassName(metricsProperties.getProperty(key));
+ } else {
+ sinkProperties
+ .getProperties()
+ .put(key.substring(secondDotIndex + 1),
metricsProperties.getProperty(key));
+ }
+ propertiesMap.put(shortName, sinkProperties);
+ }
+ }
+ sinkPropertiesSanityCheck(propertiesMap);
+ return propertiesMap;
+ }
+
+ private static void sinkPropertiesSanityCheck(
+ Map<String, MetricsSystem.SinkProperties> sinkPropsMap) {
+ for (Map.Entry<String, MetricsSystem.SinkProperties> pair :
sinkPropsMap.entrySet()) {
+ // Each Sink should have mapping class full name
+ if (StringUtils.isBlank(pair.getValue().className)) {
+ String errorMessage =
+ String.format(
+ "%s provides properties, but does not provide full class
name", pair.getKey());
+ throw new RuntimeException(errorMessage);
+ }
+ // Check the existence of each class full name
+ try {
+ Class.forName(pair.getValue().getClassName());
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(
+ String.format("Fail to find class %s",
pair.getValue().getClassName()), e);
+ }
+ }
+ }
+}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/PrometheusPullModelHandler.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/PrometheusPullModelHandler.java
new file mode 100644
index 0000000..73bb49b
--- /dev/null
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/PrometheusPullModelHandler.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics;
+
+import static org.apache.spark.k8s.operator.utils.ProbeUtil.sendMessage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import com.codahale.metrics.MetricRegistry;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import jakarta.servlet.http.HttpServletRequest;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.spark.metrics.sink.PrometheusServlet;
+
+/** Serves as simple Prometheus sink (pull model), presenting metrics snapshot
as HttpHandler */
+@Slf4j
+public class PrometheusPullModelHandler extends PrometheusServlet implements
HttpHandler {
+ private static final String EMPTY_RECORD_VALUE = "[]";
+
+ public PrometheusPullModelHandler(Properties properties, MetricRegistry
registry) {
+ super(properties, registry);
+ }
+
+ @Override
+ public void start() {
+ log.info("PrometheusPullModelHandler started");
+ }
+
+ @Override
+ public void stop() {
+ log.info("PrometheusPullModelHandler stopped");
+ }
+
+ @Override
+ public void handle(HttpExchange exchange) throws IOException {
+ HttpServletRequest httpServletRequest = null;
+ String value = getMetricsSnapshot(httpServletRequest);
+ sendMessage(exchange, 200, String.join("\n",
filterNonEmptyRecords(value)));
+ }
+
+ protected List<String> filterNonEmptyRecords(String metricsSnapshot) {
+ // filter out empty records which could cause Prometheus invalid syntax
exception, e.g:
+ // metrics_jvm_threadStates_deadlocks_Number{type="gauges"} []
+ // metrics_jvm_threadStates_deadlocks_Value{type="gauges"} []
+ String[] records = metricsSnapshot.split("\n");
+ List<String> filteredRecords = new ArrayList<>();
+ for (String record : records) {
+ String[] keyValuePair = record.split(" ");
+ if (EMPTY_RECORD_VALUE.equals(keyValuePair[1])) {
+ continue;
+ }
+ filteredRecords.add(record);
+ }
+ return filteredRecords;
+ }
+}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/KubernetesMetricsInterceptor.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/KubernetesMetricsInterceptor.java
new file mode 100644
index 0000000..5684b62
--- /dev/null
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/KubernetesMetricsInterceptor.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics.source;
+
+import static
org.apache.spark.k8s.operator.config.SparkOperatorConf.KUBERNETES_CLIENT_METRICS_GROUP_BY_RESPONSE_CODE_GROUP_ENABLED;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.Interceptor;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.apache.commons.lang3.tuple.Pair;
+import org.jetbrains.annotations.NotNull;
+
+import org.apache.spark.metrics.source.Source;
+
+@Slf4j
+public class KubernetesMetricsInterceptor implements Interceptor, Source {
+ MetricRegistry metricRegistry;
+ public static final String NAMESPACES = "namespaces";
+ public static final String HTTP_REQUEST_GROUP = "http.request";
+ public static final String HTTP_REQUEST_FAILED_GROUP = "failed";
+ public static final String HTTP_RESPONSE_GROUP = "http.response";
+ public static final String HTTP_RESPONSE_1XX = "1xx";
+ public static final String HTTP_RESPONSE_2XX = "2xx";
+ public static final String HTTP_RESPONSE_3XX = "3xx";
+ public static final String HTTP_RESPONSE_4XX = "4xx";
+ public static final String HTTP_RESPONSE_5XX = "5xx";
+ private final Histogram responseLatency;
+ private final Map<Integer, Meter> responseCodeMeters = new
ConcurrentHashMap<>();
+ private final Map<String, Meter> requestMethodCounter = new
ConcurrentHashMap<>();
+ private final List<Meter> responseCodeGroupMeters = new ArrayList<>(5);
+ private final Meter requestFailedRateMeter;
+ private final Meter requestRateMeter;
+ private final Meter responseRateMeter;
+ private final Map<String, Meter> namespacedResourceMethodMeters = new
ConcurrentHashMap<>();
+
+ public KubernetesMetricsInterceptor() {
+ metricRegistry = new MetricRegistry();
+
+ responseLatency =
+ metricRegistry.histogram(
+ MetricRegistry.name(HTTP_RESPONSE_GROUP, "latency",
"nanos").toLowerCase());
+ requestFailedRateMeter =
+
metricRegistry.meter(MetricRegistry.name(HTTP_REQUEST_FAILED_GROUP).toLowerCase());
+ requestRateMeter =
metricRegistry.meter(MetricRegistry.name(HTTP_REQUEST_GROUP).toLowerCase());
+ responseRateMeter =
+
metricRegistry.meter(MetricRegistry.name(HTTP_RESPONSE_GROUP).toLowerCase());
+
+ if
(KUBERNETES_CLIENT_METRICS_GROUP_BY_RESPONSE_CODE_GROUP_ENABLED.getValue()) {
+ responseCodeGroupMeters.add(
+
metricRegistry.meter(MetricRegistry.name(HTTP_RESPONSE_1XX).toLowerCase()));
+ responseCodeGroupMeters.add(
+
metricRegistry.meter(MetricRegistry.name(HTTP_RESPONSE_2XX).toLowerCase()));
+ responseCodeGroupMeters.add(
+
metricRegistry.meter(MetricRegistry.name(HTTP_RESPONSE_3XX).toLowerCase()));
+ responseCodeGroupMeters.add(
+
metricRegistry.meter(MetricRegistry.name(HTTP_RESPONSE_4XX).toLowerCase()));
+ responseCodeGroupMeters.add(
+
metricRegistry.meter(MetricRegistry.name(HTTP_RESPONSE_5XX).toLowerCase()));
+ }
+ }
+
+ @NotNull
+ @Override
+ public Response intercept(@NotNull Chain chain) throws IOException {
+ Request request = chain.request();
+ updateRequestMetrics(request);
+ Response response = null;
+ final long startTime = System.nanoTime();
+ try {
+ response = chain.proceed(request);
+ return response;
+ } finally {
+ updateResponseMetrics(response, startTime);
+ }
+ }
+
+ @Override
+ public String sourceName() {
+ return "kubernetes.client";
+ }
+
+ @Override
+ public MetricRegistry metricRegistry() {
+ return this.metricRegistry;
+ }
+
+ private void updateRequestMetrics(Request request) {
+ this.requestRateMeter.mark();
+ getMeterByRequestMethod(request.method()).mark();
+ Optional<Pair<String, String>> resourceNamePairOptional =
+ parseNamespaceScopedResource(request.url().uri().getPath());
+ resourceNamePairOptional.ifPresent(
+ pair -> {
+ getMeterByRequestMethodAndResourceName(pair.getValue(),
request.method()).mark();
+ getMeterByRequestMethodAndResourceName(
+ pair.getKey() + "." + pair.getValue(), request.method())
+ .mark();
+ });
+ }
+
+ private void updateResponseMetrics(Response response, long startTimeNanos) {
+ final long latency = System.nanoTime() - startTimeNanos;
+ if (response != null) {
+ this.responseRateMeter.mark();
+ this.responseLatency.update(latency);
+ getMeterByResponseCode(response.code()).mark();
+ if
(KUBERNETES_CLIENT_METRICS_GROUP_BY_RESPONSE_CODE_GROUP_ENABLED.getValue()) {
+ responseCodeGroupMeters.get(response.code() / 100 - 1).mark();
+ }
+ } else {
+ this.requestFailedRateMeter.mark();
+ }
+ }
+
+ private Meter getMeterByRequestMethod(String method) {
+ return requestMethodCounter.computeIfAbsent(
+ method,
+ key -> metricRegistry.meter(MetricRegistry.name(HTTP_REQUEST_GROUP,
method).toLowerCase()));
+ }
+
+ private Meter getMeterByRequestMethodAndResourceName(String resourceName,
String method) {
+ String metricsName = MetricRegistry.name(resourceName, method);
+ return namespacedResourceMethodMeters.computeIfAbsent(
+ metricsName, key -> metricRegistry.meter(metricsName.toLowerCase()));
+ }
+
+ private Meter getMeterByResponseCode(int code) {
+ return responseCodeMeters.computeIfAbsent(
+ code,
+ key ->
+ metricRegistry.meter(MetricRegistry.name(HTTP_RESPONSE_GROUP,
String.valueOf(code))));
+ }
+
+ public Optional<Pair<String, String>> parseNamespaceScopedResource(String
path) {
+ if (path.contains(NAMESPACES)) {
+ int index = path.indexOf(NAMESPACES) + NAMESPACES.length();
+ String namespaceAndResources = path.substring(index + 1);
+ String[] parts = namespaceAndResources.split("/");
+ return Optional.of(Pair.of(parts[0], parts[1]));
+ } else {
+ return Optional.empty();
+ }
+ }
+}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/OperatorJosdkMetrics.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/OperatorJosdkMetrics.java
new file mode 100644
index 0000000..6eb8867
--- /dev/null
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/OperatorJosdkMetrics.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics.source;
+
+import static
io.javaoperatorsdk.operator.api.reconciler.Constants.CONTROLLER_NAME;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.MetricRegistry;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.api.monitoring.Metrics;
+import io.javaoperatorsdk.operator.api.reconciler.Constants;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.processing.Controller;
+import io.javaoperatorsdk.operator.processing.GroupVersionKind;
+import io.javaoperatorsdk.operator.processing.event.Event;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import
io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
+import
io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.spark.k8s.operator.BaseResource;
+import org.apache.spark.k8s.operator.SparkApplication;
+import org.apache.spark.metrics.source.Source;
+import org.apache.spark.util.Clock;
+import org.apache.spark.util.SystemClock;
+
+@Slf4j
+public class OperatorJosdkMetrics implements Source, Metrics {
+ public static final String FINISHED = "finished";
+ public static final String CLEANUP = "cleanup";
+ public static final String FAILED = "failed";
+ public static final String RETRIES = "retries";
+ private final Map<String, Histogram> histograms = new ConcurrentHashMap<>();
+ private final Map<String, Counter> counters = new ConcurrentHashMap<>();
+ private final Map<String, Gauge<?>> gauges = new ConcurrentHashMap<>();
+ private static final String RECONCILIATION = "reconciliation";
+ private static final String RESOURCE = "resource";
+ private static final String EVENT = "event";
+ private static final String SUCCESS = "success";
+ private static final String FAILURE = "failure";
+ private static final String EXCEPTION = "exception";
+ private static final String PREFIX = "operator.sdk";
+ private static final String RECONCILIATIONS = "reconciliations";
+ private static final String RECONCILIATIONS_EXECUTIONS = RECONCILIATIONS +
".executions";
+ private static final String RECONCILIATIONS_QUEUE_SIZE = RECONCILIATIONS +
".queue.size";
+ private static final String SIZE = "size";
+
+ private final Clock clock;
+ private final MetricRegistry metricRegistry;
+
+ public OperatorJosdkMetrics() {
+ this.clock = new SystemClock();
+ this.metricRegistry = new MetricRegistry();
+ }
+
+ @Override
+ public String sourceName() {
+ return PREFIX;
+ }
+
+ @Override
+ public MetricRegistry metricRegistry() {
+ return metricRegistry;
+ }
+
+ @Override
+ public void controllerRegistered(Controller<? extends HasMetadata>
controller) {
+ // no-op
+ log.debug("Controller has been registered");
+ }
+
+ @Override
+ public void receivedEvent(Event event, Map<String, Object> metadata) {
+ log.debug("received event {}, metadata {}", event, metadata);
+ if (event instanceof ResourceEvent) {
+ final ResourceAction action = ((ResourceEvent) event).getAction();
+ final Optional<Class<? extends BaseResource<?, ?, ?, ?, ?>>> resource =
+ getResourceClass(metadata);
+ final Optional<String> namespaceOptional =
event.getRelatedCustomResourceID().getNamespace();
+ resource.ifPresent(
+ aClass -> getCounter(aClass, action.name().toLowerCase(), RESOURCE,
EVENT).inc());
+ if (resource.isPresent() && namespaceOptional.isPresent()) {
+ getCounter(
+ resource.get(),
+ namespaceOptional.get(),
+ action.name().toLowerCase(),
+ RESOURCE,
+ EVENT)
+ .inc();
+ }
+ }
+ }
+
+ @Override
+ public <T> T timeControllerExecution(ControllerExecution<T> execution)
throws Exception {
+ log.debug("Time controller execution");
+ final String name = execution.controllerName();
+ final ResourceID resourceID = execution.resourceID();
+ final Optional<String> namespaceOptional = resourceID.getNamespace();
+ final Map<String, Object> metadata = execution.metadata();
+ final Optional<Class<? extends BaseResource<?, ?, ?, ?, ?>>> resourceClass
=
+ getResourceClass(metadata);
+ final String execName = execution.name();
+
+ long startTime = clock.getTimeMillis();
+ try {
+ T result = execution.execute();
+ final String successType = execution.successTypeName(result);
+ if (resourceClass.isPresent()) {
+ getHistogram(resourceClass.get(), name, execName,
successType).update(toSeconds(startTime));
+ getCounter(resourceClass.get(), name, execName, SUCCESS,
successType).inc();
+ if (namespaceOptional.isPresent()) {
+ getHistogram(resourceClass.get(), namespaceOptional.get(), name,
execName, successType)
+ .update(toSeconds(startTime));
+ getCounter(
+ resourceClass.get(),
+ namespaceOptional.get(),
+ name,
+ execName,
+ SUCCESS,
+ successType)
+ .inc();
+ }
+ }
+ return result;
+ } catch (Exception e) {
+ log.error(
+ "Controller execution failed for resource {}, metadata {}",
resourceID, metadata, e);
+ final String exception = e.getClass().getSimpleName();
+ if (resourceClass.isPresent()) {
+ getHistogram(resourceClass.get(), name, execName,
FAILURE).update(toSeconds(startTime));
+ getCounter(resourceClass.get(), name, execName, FAILURE, EXCEPTION,
exception).inc();
+ if (namespaceOptional.isPresent()) {
+ getHistogram(resourceClass.get(), namespaceOptional.get(), name,
execName, FAILURE)
+ .update(toSeconds(startTime));
+ getCounter(
+ resourceClass.get(),
+ namespaceOptional.get(),
+ name,
+ execName,
+ FAILURE,
+ EXCEPTION,
+ exception)
+ .inc();
+ }
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void reconcileCustomResource(
+ HasMetadata resource, RetryInfo retryInfo, Map<String, Object> metadata)
{
+ log.debug(
+ "Reconcile custom resource {}, with retryInfo {} metadata {}",
+ resource,
+ retryInfo,
+ metadata);
+ if (retryInfo != null) {
+ final String namespace = resource.getMetadata().getNamespace();
+ getCounter(resource.getClass(), RECONCILIATION, RETRIES).inc();
+ getCounter(resource.getClass(), namespace, RECONCILIATION,
RETRIES).inc();
+ }
+ getCounter(
+ resource.getClass(), (String) metadata.get(CONTROLLER_NAME),
RECONCILIATIONS_QUEUE_SIZE)
+ .inc();
+ }
+
+ @Override
+ public void failedReconciliation(
+ HasMetadata resource, Exception exception, Map<String, Object> metadata)
{
+ log.error(
+ "Failed reconciliation for resource {} with metadata {}", resource,
exception, exception);
+ getCounter(resource.getClass(), RECONCILIATION, FAILED).inc();
+ getCounter(resource.getClass(), resource.getMetadata().getNamespace(),
RECONCILIATION, FAILED)
+ .inc();
+ }
+
+ @Override
+ public void finishedReconciliation(HasMetadata resource, Map<String, Object>
metadata) {
+ log.debug("Finished reconciliation for resource {} with metadata {}",
resource, metadata);
+ getCounter(resource.getClass(), RECONCILIATION, FINISHED).inc();
+ getCounter(
+ resource.getClass(), resource.getMetadata().getNamespace(),
RECONCILIATION, FINISHED);
+ }
+
+ @Override
+ public void cleanupDoneFor(ResourceID resourceID, Map<String, Object>
metadata) {
+ log.debug("Cleanup Done for resource {} with metadata {}", resourceID,
metadata);
+ getCounter(resourceID.getClass(), RECONCILIATION, CLEANUP).inc();
+ resourceID
+ .getNamespace()
+ .ifPresent(ns -> getCounter(resourceID.getClass(), ns, RECONCILIATION,
CLEANUP).inc());
+ }
+
+ @Override
+ public <T extends Map<?, ?>> T monitorSizeOf(T map, String name) {
+ log.debug("Monitor size for {}", name);
+ Gauge<Integer> gauge =
+ new Gauge<>() {
+ @Override
+ public Integer getValue() {
+ return map.size();
+ }
+ };
+ gauges.put(MetricRegistry.name(name, SIZE), gauge);
+ return map;
+ }
+
+ @Override
+ public void reconciliationExecutionStarted(HasMetadata resource, Map<String,
Object> metadata) {
+ log.debug("Reconciliation execution started");
+ String namespace = resource.getMetadata().getNamespace();
+ getCounter(
+ resource.getClass(), (String) metadata.get(CONTROLLER_NAME),
RECONCILIATIONS_EXECUTIONS)
+ .inc();
+ getCounter(
+ resource.getClass(),
+ namespace,
+ (String) metadata.get(CONTROLLER_NAME),
+ RECONCILIATIONS_EXECUTIONS)
+ .inc();
+ }
+
+ @Override
+ public void reconciliationExecutionFinished(HasMetadata resource,
Map<String, Object> metadata) {
+ log.debug("Reconciliation execution finished");
+ String namespace = resource.getMetadata().getNamespace();
+ getCounter(
+ resource.getClass(), (String) metadata.get(CONTROLLER_NAME),
RECONCILIATIONS_EXECUTIONS)
+ .dec();
+ getCounter(
+ resource.getClass(),
+ namespace,
+ (String) metadata.get(CONTROLLER_NAME),
+ RECONCILIATIONS_EXECUTIONS)
+ .dec();
+ getCounter(
+ resource.getClass(), (String) metadata.get(CONTROLLER_NAME),
RECONCILIATIONS_QUEUE_SIZE)
+ .dec();
+ }
+
+ private long toSeconds(long startTimeInMilliseconds) {
+ return TimeUnit.MILLISECONDS.toSeconds(clock.getTimeMillis() -
startTimeInMilliseconds);
+ }
+
+ private Histogram getHistogram(Class<?> kclass, String... names) {
+ String name = MetricRegistry.name(kclass.getSimpleName(),
names).toLowerCase();
+ Histogram histogram;
+ if (!histograms.containsKey(name)) {
+ histogram = metricRegistry.histogram(name);
+ histograms.put(name, histogram);
+ } else {
+ histogram = histograms.get(name);
+ }
+ return histogram;
+ }
+
+ private Counter getCounter(Class<?> klass, String... names) {
+ String name = MetricRegistry.name(klass.getSimpleName(),
names).toLowerCase();
+ Counter counter;
+ if (!counters.containsKey(name)) {
+ counter = metricRegistry.counter(name);
+ counters.put(name, counter);
+ } else {
+ counter = counters.get(name);
+ }
+ return counter;
+ }
+
+ private Optional<Class<? extends BaseResource<?, ?, ?, ?, ?>>>
getResourceClass(
+ Map<String, Object> metadata) {
+ GroupVersionKind resourceGvk = (GroupVersionKind)
metadata.get(Constants.RESOURCE_GVK_KEY);
+
+ if (resourceGvk == null) {
+ return Optional.empty();
+ }
+
+ Class<? extends BaseResource<?, ?, ?, ?, ?>> resourceClass;
+
+ if (resourceGvk.getKind().equals(SparkApplication.class.getSimpleName())) {
+ resourceClass = SparkApplication.class;
+ } else {
+ return Optional.empty();
+ }
+ return Optional.of(resourceClass);
+ }
+}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/OperatorJvmSource.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/OperatorJvmSource.java
new file mode 100644
index 0000000..8cf7df0
--- /dev/null
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/OperatorJvmSource.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics.source;
+
+import com.codahale.metrics.MetricRegistry;
+
+import org.apache.spark.k8s.operator.metrics.JVMMetricSet;
+import org.apache.spark.metrics.source.Source;
+
+public class OperatorJvmSource implements Source {
+ @Override
+ public String sourceName() {
+ return "jvm";
+ }
+
+ @Override
+ public MetricRegistry metricRegistry() {
+ MetricRegistry metricRegistry = new MetricRegistry();
+ metricRegistry.registerAll(new JVMMetricSet());
+ return metricRegistry;
+ }
+}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ClassLoadingUtils.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ClassLoadingUtils.java
new file mode 100644
index 0000000..1d89a77
--- /dev/null
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ClassLoadingUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.utils;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.spark.k8s.operator.listeners.BaseStatusListener;
+
+@Slf4j
+public class ClassLoadingUtils {
+ /**
+ * load status listener classes for given type of BaseStatusListener
+ *
+ * @param clazz Class name of status listener. e.g. SparkAppStatusListener
+ * @param implementationClassNames Comma-separated implementation class
names of given type of
+ * StatusListener
+ * @return list of listeners loaded
+ * @param <T> Class of status listener. e.g. SparkAppStatusListener
+ */
+ public static <T extends BaseStatusListener<?, ?>> List<T> getStatusListener(
+ Class<T> clazz, String implementationClassNames) {
+ List<T> listeners = new ArrayList<>();
+ if (StringUtils.isNotBlank(implementationClassNames)) {
+ try {
+ Set<String> listenerNames =
Utils.sanitizeCommaSeparatedStrAsSet(implementationClassNames);
+ for (String name : listenerNames) {
+ Class listenerClass = Class.forName(name);
+ if (clazz.isAssignableFrom(listenerClass)) {
+ listeners.add((T) listenerClass.getConstructor().newInstance());
+ }
+ }
+ } catch (ClassNotFoundException
+ | NoSuchMethodException
+ | InstantiationException
+ | IllegalAccessException
+ | InvocationTargetException e) {
+ if (log.isErrorEnabled()) {
+ log.error(
+ "Failed to initialize listeners for operator with {}",
implementationClassNames, e);
+ }
+ }
+ }
+ return listeners;
+ }
+}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/LoggingUtils.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/LoggingUtils.java
new file mode 100644
index 0000000..d205019
--- /dev/null
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/LoggingUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.utils;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.slf4j.MDC;
+
+import org.apache.spark.k8s.operator.SparkApplication;
+import org.apache.spark.k8s.operator.status.ApplicationAttemptSummary;
+
+public class LoggingUtils {
+ public static final class TrackedMDC {
+ public static final String AppAttemptIdKey = "resource.app.attemptId";
+ private final ReentrantLock lock = new ReentrantLock();
+ private Set<String> keys = new HashSet<>();
+
+ public void set(final SparkApplication application) {
+ if (application != null && application.getStatus() != null) {
+ try {
+ lock.lock();
+ ApplicationAttemptSummary summary =
application.getStatus().getCurrentAttemptSummary();
+ if (summary != null && summary.getAttemptInfo() != null) {
+ MDC.put(AppAttemptIdKey,
String.valueOf(summary.getAttemptInfo().getId()));
+ keys.add(AppAttemptIdKey);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ public void reset() {
+ try {
+ lock.lock();
+ for (String mdcKey : keys) {
+ MDC.remove(mdcKey);
+ }
+ keys.clear();
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/PodPhase.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/PodPhase.java
new file mode 100644
index 0000000..74c7d66
--- /dev/null
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/PodPhase.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.utils;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Defines common (driver) pod phases that may need to be handled explicitly
by the operator. For
+ * pod phase definition: {@see
+ * https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase}
+ */
+@Slf4j
+public enum PodPhase {
+ PENDING("pending"),
+ RUNNING("running"),
+ FAILED("failed"),
+ SUCCEEDED("succeeded"),
+ UNKNOWN("unknown");
+
+ private final String description;
+
+ PodPhase(String description) {
+ this.description = description;
+ }
+
+ public static PodPhase getPhase(final Pod pod) {
+ if (pod == null || pod.getStatus() == null) {
+ return UNKNOWN;
+ }
+ for (PodPhase podPhase : values()) {
+ if (podPhase.description.equalsIgnoreCase(pod.getStatus().getPhase())) {
+ return podPhase;
+ }
+ }
+ if (log.isErrorEnabled()) {
+ log.error("Unable to determine pod phase from status: {}",
pod.getStatus());
+ }
+ return UNKNOWN;
+ }
+}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/PodUtils.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/PodUtils.java
new file mode 100644
index 0000000..9df3f53
--- /dev/null
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/PodUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.utils;
+
+import static
org.apache.spark.k8s.operator.utils.ModelUtils.findDriverMainContainerStatus;
+
+import java.util.List;
+
+import io.fabric8.kubernetes.api.model.ContainerStatus;
+import io.fabric8.kubernetes.api.model.Pod;
+
+import org.apache.spark.k8s.operator.spec.ApplicationSpec;
+
+public class PodUtils {
+
+ public static final String POD_READY_CONDITION_TYPE = "ready";
+
+ /** Determine whether given pod is up running and ready */
+ public static boolean isPodReady(final Pod pod) {
+ if (!PodPhase.RUNNING.equals(PodPhase.getPhase(pod))) {
+ return false;
+ }
+ if (pod == null
+ || pod.getStatus() == null
+ || pod.getStatus().getConditions() == null
+ || pod.getStatus().getConditions().isEmpty()) {
+ return false;
+ }
+ return pod.getStatus().getConditions().parallelStream()
+ .anyMatch(
+ condition ->
+ POD_READY_CONDITION_TYPE.equalsIgnoreCase(condition.getType())
+ && Boolean.parseBoolean(condition.getStatus()));
+ }
+
+ /**
+ * Determine whether the driver pod is started. Driver is considered as
'started' if any of Spark
+ * container is started and ready
+ *
+ * @param driver the driver pod
+ * @param spec expected spec for the SparkApp
+ */
+ public static boolean isDriverPodStarted(final Pod driver, final
ApplicationSpec spec) {
+ // Consider pod as 'started' if any of Spark container is started and ready
+ if (driver == null
+ || driver.getStatus() == null
+ || driver.getStatus().getContainerStatuses() == null
+ || driver.getStatus().getContainerStatuses().isEmpty()) {
+ return false;
+ }
+
+ List<ContainerStatus> containerStatusList =
driver.getStatus().getContainerStatuses();
+
+ // If there's only one container in given pod, evaluate it
+ // Otherwise, use the provided name as filter.
+ if (containerStatusList.size() == 1) {
+ return containerStatusList.get(0).getReady();
+ }
+
+ return findDriverMainContainerStatus(spec, containerStatusList).stream()
+ .anyMatch(ContainerStatus::getReady);
+ }
+
+ /** Returns true if the given container has terminated */
+ public static boolean isContainerTerminated(final ContainerStatus
containerStatus) {
+ return containerStatus != null
+ && containerStatus.getState() != null
+ && containerStatus.getState().getTerminated() != null;
+ }
+
+ /** Returns true if the given container has ever restarted */
+ public static boolean isContainerRestarted(final ContainerStatus
containerStatus) {
+ return containerStatus != null && containerStatus.getRestartCount() > 0;
+ }
+
+ /** Returns true if the given container has exited with non-zero status */
+ public static boolean isContainerFailed(final ContainerStatus
containerStatus) {
+ return isContainerTerminated(containerStatus)
+ && containerStatus.getState().getTerminated().getExitCode() > 0;
+ }
+}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ProbeUtil.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ProbeUtil.java
new file mode 100644
index 0000000..4ac1fd7
--- /dev/null
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ProbeUtil.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.utils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Optional;
+
+import com.sun.net.httpserver.HttpExchange;
+import io.javaoperatorsdk.operator.Operator;
+import io.javaoperatorsdk.operator.RuntimeInfo;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ProbeUtil {
+ public static void sendMessage(HttpExchange httpExchange, int code, String
message)
+ throws IOException {
+ try (OutputStream outputStream = httpExchange.getResponseBody()) {
+ byte[] bytes = message.getBytes(StandardCharsets.UTF_8);
+ httpExchange.sendResponseHeaders(code, bytes.length);
+ outputStream.write(bytes);
+ outputStream.flush();
+ }
+ }
+
+ public static Optional<Boolean> areOperatorsStarted(List<Operator>
operators) {
+ return operators.stream()
+ .map(
+ operator -> {
+ RuntimeInfo runtimeInfo = operator.getRuntimeInfo();
+ if (runtimeInfo != null) {
+ if (!operator.getRuntimeInfo().isStarted()) {
+ log.error("Operator is not running");
+ return false;
+ }
+ return true;
+ }
+ return false;
+ })
+ .reduce((a, b) -> a && b);
+ }
+}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusUtils.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusUtils.java
new file mode 100644
index 0000000..c3491d1
--- /dev/null
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.utils;
+
+import org.apache.spark.k8s.operator.Constants;
+import org.apache.spark.k8s.operator.SparkApplication;
+import org.apache.spark.k8s.operator.status.ApplicationState;
+import org.apache.spark.k8s.operator.status.ApplicationStateSummary;
+
+/** Handy utils for create & manage Application Status */
+public class SparkAppStatusUtils {
+
+ public static boolean isValidApplicationStatus(SparkApplication app) {
+ // null check
+ return app.getStatus() != null
+ && app.getStatus().getCurrentState() != null
+ && app.getStatus().getCurrentState().getCurrentStateSummary() != null;
+ }
+
+ public static ApplicationState driverUnexpectedRemoved() {
+ return new ApplicationState(
+ ApplicationStateSummary.Failed,
Constants.DRIVER_UNEXPECTED_REMOVED_MESSAGE);
+ }
+
+ public static ApplicationState driverLaunchTimedOut() {
+ return new ApplicationState(
+ ApplicationStateSummary.DriverStartTimedOut,
Constants.DRIVER_LAUNCH_TIMEOUT_MESSAGE);
+ }
+
+ public static ApplicationState driverReadyTimedOut() {
+ return new ApplicationState(
+ ApplicationStateSummary.DriverReadyTimedOut,
Constants.DRIVER_LAUNCH_TIMEOUT_MESSAGE);
+ }
+
+ public static ApplicationState executorLaunchTimedOut() {
+ return new ApplicationState(
+ ApplicationStateSummary.ExecutorsStartTimedOut,
Constants.EXECUTOR_LAUNCH_TIMEOUT_MESSAGE);
+ }
+
+ public static ApplicationState appCancelled() {
+ return new ApplicationState(
+ ApplicationStateSummary.ResourceReleased,
Constants.APP_CANCELLED_MESSAGE);
+ }
+
+ public static boolean hasReachedState(
+ SparkApplication application, ApplicationState stateToCheck) {
+ if (!isValidApplicationStatus(application)) {
+ return false;
+ }
+ return
application.getStatus().getStateTransitionHistory().keySet().parallelStream()
+ .anyMatch(
+ stateId ->
+ stateToCheck.equals(
+
application.getStatus().getStateTransitionHistory().get(stateId)));
+ }
+}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkExceptionUtils.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkExceptionUtils.java
new file mode 100644
index 0000000..a7be937
--- /dev/null
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkExceptionUtils.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.utils;
+
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
+public class SparkExceptionUtils {
+ public static boolean
isConflictForExistingResource(KubernetesClientException e) {
+ return e != null
+ && e.getCode() == 409
+ && e.getStatus() != null
+ && StringUtils.isNotEmpty(e.getStatus().toString())
+ && e.getStatus().toString().toLowerCase().contains("alreadyexists");
+ }
+
+ public static String buildGeneralErrorMessage(Exception e) {
+ return ExceptionUtils.getStackTrace(e);
+ }
+}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
new file mode 100644
index 0000000..aae6df9
--- /dev/null
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.utils;
+
+import static org.apache.spark.k8s.operator.Constants.LABEL_RESOURCE_NAME;
+import static
org.apache.spark.k8s.operator.Constants.LABEL_SPARK_OPERATOR_NAME;
+import static
org.apache.spark.k8s.operator.Constants.LABEL_SPARK_ROLE_DRIVER_VALUE;
+import static
org.apache.spark.k8s.operator.Constants.LABEL_SPARK_ROLE_EXECUTOR_VALUE;
+import static
org.apache.spark.k8s.operator.config.SparkOperatorConf.OPERATOR_APP_NAME;
+import static
org.apache.spark.k8s.operator.config.SparkOperatorConf.OPERATOR_WATCHED_NAMESPACES;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.spark.k8s.operator.Constants;
+import org.apache.spark.k8s.operator.SparkApplication;
+
+public class Utils {
+ public static Set<String> sanitizeCommaSeparatedStrAsSet(String str) {
+ if (StringUtils.isBlank(str)) {
+ return Collections.emptySet();
+ }
+ if ("*".equals(str)) {
+ return Collections.emptySet();
+ }
+ return Arrays.stream(str.split(","))
+ .map(String::trim)
+ .filter(StringUtils::isNotBlank)
+ .collect(Collectors.toSet());
+ }
+
+ public static String labelsAsStr(Map<String, String> labels) {
+ return labels.entrySet().stream()
+ .map(e -> String.join("=", e.getKey(), e.getValue()))
+ .collect(Collectors.joining(","));
+ }
+
+ public static Map<String, String> commonOperatorResourceLabels() {
+ Map<String, String> labels = new HashMap<>();
+ labels.put(LABEL_RESOURCE_NAME, OPERATOR_APP_NAME.getValue());
+ return labels;
+ }
+
+ public static Map<String, String> defaultOperatorConfigLabels() {
+ Map<String, String> labels = new HashMap<>(commonOperatorResourceLabels());
+ labels.put("app.kubernetes.io/component",
"operator-dynamic-config-overrides");
+ return labels;
+ }
+
+ public static Map<String, String> commonManagedResourceLabels() {
+ Map<String, String> labels = new HashMap<>();
+ labels.put(LABEL_SPARK_OPERATOR_NAME, OPERATOR_APP_NAME.getValue());
+ return labels;
+ }
+
+ public static Map<String, String> sparkAppResourceLabels(final
SparkApplication app) {
+ return sparkAppResourceLabels(app.getMetadata().getName());
+ }
+
+ public static Map<String, String> sparkAppResourceLabels(final String
appName) {
+ Map<String, String> labels = commonManagedResourceLabels();
+ labels.put(Constants.LABEL_SPARK_APPLICATION_NAME, appName);
+ return labels;
+ }
+
+ public static Map<String, String> driverLabels(final SparkApplication
sparkApplication) {
+ Map<String, String> labels = sparkAppResourceLabels(sparkApplication);
+ labels.put(Constants.LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_DRIVER_VALUE);
+ return labels;
+ }
+
+ public static Map<String, String> executorLabels(final SparkApplication
sparkApplication) {
+ Map<String, String> labels = sparkAppResourceLabels(sparkApplication);
+ labels.put(Constants.LABEL_SPARK_ROLE_NAME,
LABEL_SPARK_ROLE_EXECUTOR_VALUE);
+ return labels;
+ }
+
+ public static Set<String> getWatchedNamespaces() {
+ return
Utils.sanitizeCommaSeparatedStrAsSet(OPERATOR_WATCHED_NAMESPACES.getValue());
+ }
+
+ /**
+ * Labels to be applied to all created resources, as a comma-separated string
+ *
+ * @return labels string
+ */
+ public static String commonResourceLabelsStr() {
+ return labelsAsStr(commonManagedResourceLabels());
+ }
+}
diff --git a/spark-operator/src/main/resources/EcsLayout.json
b/spark-operator/src/main/resources/EcsLayout.json
new file mode 100644
index 0000000..4660280
--- /dev/null
+++ b/spark-operator/src/main/resources/EcsLayout.json
@@ -0,0 +1,49 @@
+{
+ "@timestamp": {
+ "$resolver": "timestamp",
+ "pattern": {
+ "format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'",
+ "timeZone": "UTC"
+ }
+ },
+ "ecs.version": "1.6.0",
+ "log.level": {
+ "$resolver": "level",
+ "field": "name"
+ },
+ "message": {
+ "$resolver": "message",
+ "stringified": true
+ },
+ "process.thread.name": {
+ "$resolver": "thread",
+ "field": "name"
+ },
+ "log.logger": {
+ "$resolver": "logger",
+ "field": "name"
+ },
+ "labels": {
+ "$resolver": "mdc",
+ "flatten": true,
+ "stringified": true
+ },
+ "tags": {
+ "$resolver": "ndc"
+ },
+ "error.type": {
+ "$resolver": "exception",
+ "field": "className"
+ },
+ "error.message": {
+ "$resolver": "exception",
+ "field": "message"
+ },
+ "error.stack_trace": {
+ "$resolver": "exception",
+ "field": "stackTrace",
+ "stackTrace": {
+ "stringified": true
+ }
+ }
+}
diff --git
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/MetricsSystemFactoryTest.java
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/MetricsSystemFactoryTest.java
new file mode 100644
index 0000000..a9a341b
--- /dev/null
+++
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/MetricsSystemFactoryTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics;
+
+import static
org.apache.spark.k8s.operator.metrics.MetricsSystemFactory.parseSinkProperties;
+import static org.junit.Assert.assertThrows;
+
+import java.util.Properties;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class MetricsSystemFactoryTest {
+
+ @Test
+ void testMetricsSystemFailFastWithNoClassFullName() {
+ Properties properties = new Properties();
+ properties.put("sink.mocksink.period", "10");
+ properties.put("sink.console.class",
"org.apache.spark.metrics.sink.ConsoleSink");
+ RuntimeException e =
+ assertThrows(RuntimeException.class, () ->
parseSinkProperties(properties));
+ Assertions.assertEquals(
+ "mocksink provides properties, but does not provide full class name",
e.getMessage());
+ }
+
+ @Test
+ void testMetricsSystemFailFastWithNotFoundClassName() {
+ Properties properties = new Properties();
+ properties.put("sink.console.class",
"org.apache.spark.metrics.sink.FooSink");
+ RuntimeException e =
+ assertThrows(RuntimeException.class, () ->
parseSinkProperties(properties));
+ Assertions.assertEquals(
+ "Fail to find class org.apache.spark.metrics.sink.FooSink",
e.getMessage());
+ }
+}
diff --git
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/MetricsSystemTest.java
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/MetricsSystemTest.java
new file mode 100644
index 0000000..5bfccc8
--- /dev/null
+++
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/MetricsSystemTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics;
+
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import org.apache.spark.k8s.operator.metrics.sink.MockSink;
+import org.apache.spark.metrics.sink.Sink;
+import org.apache.spark.metrics.source.Source;
+
+class MetricsSystemTest {
+ @Test
+ void testMetricsSystemWithResourcesAdd() {
+ MetricsSystem metricsSystem = new MetricsSystem();
+ Set<Source> sourcesList = metricsSystem.getSources();
+ Set<Sink> sinks = metricsSystem.getSinks();
+ metricsSystem.start();
+ Assertions.assertEquals(1, sourcesList.size());
+ // By default, only prometheus sink is enabled
+ Assertions.assertEquals(1, sinks.size());
+ Assertions.assertFalse(metricsSystem.getRegistry().getMetrics().isEmpty());
+ metricsSystem.stop();
+ Assertions.assertTrue(metricsSystem.getRegistry().getMetrics().isEmpty());
+ }
+
+ @Test
+ void testMetricsSystemWithCustomizedSink() {
+ Properties properties = new Properties();
+ properties.put("sink.mocksink.class",
"org.apache.spark.k8s.operator.metrics.sink.MockSink");
+ properties.put("sink.mocksink.period", "10");
+ MetricsSystem metricsSystem = new MetricsSystem(properties);
+ metricsSystem.start();
+ Assertions.assertEquals(2, metricsSystem.getSinks().size());
+ Optional<Sink> mockSinkOptional =
+ metricsSystem.getSinks().stream().filter(sink -> sink instanceof
MockSink).findFirst();
+ Assertions.assertTrue(mockSinkOptional.isPresent());
+ Sink mockSink = mockSinkOptional.get();
+ metricsSystem.stop();
+ MockSink sink = (MockSink) mockSink;
+ Assertions.assertEquals(sink.getPollPeriod(), 10);
+ Assertions.assertEquals(sink.getTimeUnit(), TimeUnit.SECONDS);
+ }
+
+ @Test
+ void testMetricsSystemWithTwoSinkConfigurations() {
+ Properties properties = new Properties();
+ properties.put("sink.mocksink.class",
"org.apache.spark.k8s.operator.metrics.sink.MockSink");
+ properties.put("sink.mocksink.period", "10");
+ properties.put("sink.console.class",
"org.apache.spark.metrics.sink.ConsoleSink");
+ MetricsSystem metricsSystem = new MetricsSystem(properties);
+ metricsSystem.start();
+ Assertions.assertEquals(3, metricsSystem.getSinks().size());
+ }
+}
diff --git
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/sink/MockSink.java
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/sink/MockSink.java
new file mode 100644
index 0000000..2026d9c
--- /dev/null
+++
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/sink/MockSink.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics.sink;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.MetricRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.metrics.sink.Sink;
+
+@SuppressWarnings("PMD")
+public class MockSink implements Sink {
+ private static final Logger logger = LoggerFactory.getLogger(MockSink.class);
+ private Properties properties;
+ private MetricRegistry metricRegistry;
+ public static final String DEFAULT_UNIT = "SECONDS";
+ public static final int DEFAULT_PERIOD = 20;
+ public static final String KEY_PERIOD = "period";
+ public static final String KEY_UNIT = "unit";
+
+ public int getPollPeriod() {
+ return Integer.parseInt((String) properties.getOrDefault(KEY_PERIOD,
DEFAULT_PERIOD));
+ }
+
+ public TimeUnit getTimeUnit() {
+ return TimeUnit.valueOf((String) properties.getOrDefault(KEY_UNIT,
DEFAULT_UNIT));
+ }
+
+ public MockSink(Properties properties, MetricRegistry metricRegistry) {
+ logger.info("Current properties: {}", properties);
+ this.properties = properties;
+ this.metricRegistry = metricRegistry;
+ }
+
+ @Override
+ public void start() {
+ logger.info("Mock sink started");
+ }
+
+ @Override
+ public void stop() {
+ logger.info("Mock sink stopped");
+ }
+
+ @Override
+ public void report() {
+ logger.info("Mock sink reported");
+ }
+}
diff --git
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/source/KubernetesMetricsInterceptorTest.java
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/source/KubernetesMetricsInterceptorTest.java
new file mode 100644
index 0000000..bd34e33
--- /dev/null
+++
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/source/KubernetesMetricsInterceptorTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics.source;
+
+import static org.junit.Assert.assertThrows;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+import okhttp3.Interceptor;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+import org.apache.spark.k8s.operator.SparkApplication;
+import org.apache.spark.k8s.operator.client.KubernetesClientFactory;
+import org.apache.spark.k8s.operator.spec.ApplicationSpec;
+
+@EnableKubernetesMockClient(crud = true)
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+@SuppressWarnings("PMD")
+@SuppressFBWarnings(
+ value = {"UWF_UNWRITTEN_FIELD", "NP_UNWRITTEN_FIELD", "UUF_UNUSED_FIELD"},
+ justification = "Unwritten fields are covered by Kubernetes mock client")
+class KubernetesMetricsInterceptorTest {
+
+ private KubernetesMockServer mockServer;
+ private KubernetesClient kubernetesClient;
+
+ @AfterEach
+ void cleanUp() {
+ mockServer.reset();
+ }
+
+ @Test
+ @Order(1)
+ void testMetricsEnabled() {
+ KubernetesMetricsInterceptor metricsInterceptor = new
KubernetesMetricsInterceptor();
+ List<Interceptor> interceptors =
Collections.singletonList(metricsInterceptor);
+ KubernetesClient client =
+ KubernetesClientFactory.buildKubernetesClient(
+ interceptors, kubernetesClient.getConfiguration());
+ SparkApplication sparkApplication = createSparkApplication();
+ ConfigMap configMap = createConfigMap();
+
+ Map<String, Metric> metrics = new
HashMap<>(metricsInterceptor.metricRegistry().getMetrics());
+ Assertions.assertEquals(9, metrics.size());
+ client.resource(sparkApplication).create();
+ client.resource(configMap).get();
+ Map<String, Metric> metrics2 = new
HashMap<>(metricsInterceptor.metricRegistry().getMetrics());
+ Assertions.assertEquals(17, metrics2.size());
+ List<String> expectedMetricsName =
+ Arrays.asList(
+ "http.response.201",
+ "http.request.post",
+ "sparkapplications.post",
+ "spark-test.sparkapplications.post",
+ "spark-test.sparkapplications.post",
+ "configmaps.get",
+ "spark-system.configmaps.get",
+ "2xx",
+ "4xx");
+ expectedMetricsName.stream()
+ .forEach(
+ name -> {
+ Meter metric = (Meter) metrics2.get(name);
+ Assertions.assertEquals(metric.getCount(), 1);
+ });
+ client.resource(sparkApplication).delete();
+ }
+
+ @Test
+ @Order(2)
+ void testWhenKubernetesServerNotWorking() {
+ KubernetesMetricsInterceptor metricsInterceptor = new
KubernetesMetricsInterceptor();
+ List<Interceptor> interceptors =
Collections.singletonList(metricsInterceptor);
+ KubernetesClient client =
+ KubernetesClientFactory.buildKubernetesClient(
+ interceptors, kubernetesClient.getConfiguration());
+ int retry = client.getConfiguration().getRequestRetryBackoffLimit();
+ mockServer.shutdown();
+ SparkApplication sparkApplication = createSparkApplication();
+ assertThrows(
+ Exception.class,
+ () -> {
+ client.resource(sparkApplication).create();
+ });
+
+ Map<String, Metric> map = metricsInterceptor.metricRegistry().getMetrics();
+ Assertions.assertEquals(12, map.size());
+ Meter metric = (Meter) map.get("failed");
+ Assertions.assertEquals(metric.getCount(), retry + 1);
+ }
+
+ private static SparkApplication createSparkApplication() {
+ ObjectMeta meta = new ObjectMeta();
+ meta.setName("sample-spark-application");
+ meta.setNamespace("spark-test");
+ SparkApplication sparkApplication = new SparkApplication();
+ sparkApplication.setMetadata(meta);
+ ApplicationSpec applicationSpec = new ApplicationSpec();
+ applicationSpec.setMainClass("org.apache.spark.examples.SparkPi");
+
applicationSpec.setJars("local:///opt/spark/examples/jars/spark-examples.jar");
+ applicationSpec.setSparkConf(
+ Map.of(
+ "spark.executor.instances", "5",
+ "spark.kubernetes.container.image", "spark",
+ "spark.kubernetes.namespace", "spark-test",
+ "spark.kubernetes.authenticate.driver.serviceAccountName",
"spark"));
+ sparkApplication.setSpec(applicationSpec);
+ return sparkApplication;
+ }
+
+ private static ConfigMap createConfigMap() {
+ ObjectMeta meta = new ObjectMeta();
+ meta.setName("spark-job-operator-configuration");
+ meta.setNamespace("spark-system");
+ ConfigMap configMap = new ConfigMap();
+ configMap.setMetadata(meta);
+ return configMap;
+ }
+}
diff --git
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/source/OperatorJosdkMetricsTest.java
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/source/OperatorJosdkMetricsTest.java
new file mode 100644
index 0000000..5fab771
--- /dev/null
+++
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/source/OperatorJosdkMetricsTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics.source;
+
+import java.util.Map;
+
+import com.codahale.metrics.Metric;
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import io.javaoperatorsdk.operator.api.monitoring.Metrics;
+import io.javaoperatorsdk.operator.api.reconciler.Constants;
+import io.javaoperatorsdk.operator.processing.GroupVersionKind;
+import io.javaoperatorsdk.operator.processing.event.Event;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import
io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
+import
io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.spark.k8s.operator.SparkApplication;
+
+class OperatorJosdkMetricsTest {
+ private static final String DEFAULT_NAMESPACE = "default";
+ private static final String TEST_RESOURCE_NAME = "test1";
+ private static final ResourceID resourceId = new ResourceID("spark-pi",
"testns");
+
+ private static final Map<String, Object> metadata =
+ Map.of(
+ Constants.RESOURCE_GVK_KEY,
+ GroupVersionKind.gvkFor(SparkApplication.class),
+ Constants.CONTROLLER_NAME,
+ "test-controller-name");
+ private static final String controllerName = "test-controller";
+
+ private OperatorJosdkMetrics operatorMetrics;
+
+ @BeforeEach
+ public void setup() {
+ operatorMetrics = new OperatorJosdkMetrics();
+ }
+
+ @Test
+ void testTimeControllerExecution() throws Exception {
+ TestingExecutionBase<SparkApplication> successExecution = new
TestingExecutionBase<>();
+ operatorMetrics.timeControllerExecution(successExecution);
+ Map<String, Metric> metrics =
operatorMetrics.metricRegistry().getMetrics();
+ Assertions.assertEquals(4, metrics.size());
+
Assertions.assertTrue(metrics.containsKey("sparkapplication.test-controller.reconcile.both"));
+ Assertions.assertTrue(
+
metrics.containsKey("sparkapplication.testns.test-controller.reconcile.both"));
+ Assertions.assertTrue(
+
metrics.containsKey("sparkapplication.test-controller.reconcile.success.both"));
+ Assertions.assertTrue(
+
metrics.containsKey("sparkapplication.testns.test-controller.reconcile.success.both"));
+
+ FooTestingExecutionBase<SparkApplication> failedExecution = new
FooTestingExecutionBase<>();
+ try {
+ operatorMetrics.timeControllerExecution(failedExecution);
+ } catch (Exception e) {
+ Assertions.assertEquals(e.getMessage(), "Foo exception");
+ Assertions.assertEquals(8, metrics.size());
+ Assertions.assertTrue(
+
metrics.containsKey("sparkapplication.test-controller.reconcile.failure"));
+ Assertions.assertTrue(
+ metrics.containsKey(
+ "sparkapplication.test-controller.reconcile.failure.exception"
+ + ".nosuchfieldexception"));
+ Assertions.assertTrue(
+
metrics.containsKey("sparkapplication.testns.test-controller.reconcile.failure"));
+ Assertions.assertTrue(
+ metrics.containsKey(
+ "sparkapplication.testns.test-controller.reconcile.failure."
+ + "exception.nosuchfieldexception"));
+ }
+ }
+
+ @Test
+ void testReconciliationFinished() {
+ operatorMetrics.finishedReconciliation(buildNamespacedResource(),
metadata);
+ Map<String, Metric> metrics =
operatorMetrics.metricRegistry().getMetrics();
+ Assertions.assertEquals(2, metrics.size());
+
Assertions.assertTrue(metrics.containsKey("configmap.default.reconciliation.finished"));
+
Assertions.assertTrue(metrics.containsKey("configmap.reconciliation.finished"));
+ }
+
+ @Test
+ void testReconciliationExecutionStartedAndFinished() {
+ operatorMetrics.reconciliationExecutionStarted(buildNamespacedResource(),
metadata);
+ Map<String, Metric> metrics =
operatorMetrics.metricRegistry().getMetrics();
+ Assertions.assertEquals(2, metrics.size());
+ Assertions.assertTrue(
+
metrics.containsKey("configmap.test-controller-name.reconciliations.executions"));
+ Assertions.assertTrue(
+
metrics.containsKey("configmap.default.test-controller-name.reconciliations.executions"));
+ operatorMetrics.reconciliationExecutionFinished(buildNamespacedResource(),
metadata);
+ Assertions.assertEquals(3, metrics.size());
+ Assertions.assertTrue(
+
metrics.containsKey("configmap.test-controller-name.reconciliations.queue.size"));
+ }
+
+ @Test
+ void testReceivedEvent() {
+ Event event = new ResourceEvent(ResourceAction.ADDED, resourceId,
buildNamespacedResource());
+ operatorMetrics.receivedEvent(event, metadata);
+ Map<String, Metric> metrics =
operatorMetrics.metricRegistry().getMetrics();
+ Assertions.assertEquals(2, metrics.size());
+
Assertions.assertTrue(metrics.containsKey("sparkapplication.added.resource.event"));
+
Assertions.assertTrue(metrics.containsKey("sparkapplication.testns.added.resource.event"));
+ }
+
+ private static class TestingExecutionBase<T> implements
Metrics.ControllerExecution<T> {
+ @Override
+ public String controllerName() {
+ return controllerName;
+ }
+
+ @Override
+ public String successTypeName(Object o) {
+ return "both";
+ }
+
+ @Override
+ public ResourceID resourceID() {
+ return resourceId;
+ }
+
+ @Override
+ public Map<String, Object> metadata() {
+ return metadata;
+ }
+
+ @Override
+ public String name() {
+ return "reconcile";
+ }
+
+ @Override
+ public T execute() throws Exception {
+ Thread.sleep(1000);
+ return null;
+ }
+ }
+
+ private static class FooTestingExecutionBase<T> implements
Metrics.ControllerExecution<T> {
+ @Override
+ public String controllerName() {
+ return controllerName;
+ }
+
+ @Override
+ public String successTypeName(Object o) {
+ return "resource";
+ }
+
+ @Override
+ public ResourceID resourceID() {
+ return resourceId;
+ }
+
+ @Override
+ public Map<String, Object> metadata() {
+ return metadata;
+ }
+
+ @Override
+ public String name() {
+ return "reconcile";
+ }
+
+ @Override
+ public T execute() throws Exception {
+ throw new NoSuchFieldException("Foo exception");
+ }
+ }
+
+ private HasMetadata buildNamespacedResource() {
+ ConfigMap cm = new ConfigMap();
+ cm.setMetadata(
+ new ObjectMetaBuilder()
+ .withName(TEST_RESOURCE_NAME)
+ .withNamespace(DEFAULT_NAMESPACE)
+ .build());
+ return cm;
+ }
+}
diff --git
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/utils/TestUtils.java
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/utils/TestUtils.java
new file mode 100644
index 0000000..59b3989
--- /dev/null
+++
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/utils/TestUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.utils;
+
+import static org.apache.spark.k8s.operator.Constants.API_GROUP;
+import static org.apache.spark.k8s.operator.Constants.API_VERSION;
+
+import java.io.File;
+import java.util.Map;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+
+import org.apache.spark.k8s.operator.Constants;
+import org.apache.spark.k8s.operator.SparkApplication;
+
+public class TestUtils {
+ public static SparkApplication createMockApp(String namespace) {
+ SparkApplication cr = new SparkApplication();
+ cr.setKind("SparkApplication");
+ cr.setApiVersion(String.join("/", API_GROUP, API_VERSION));
+ cr.setSpec(cr.initSpec());
+ ObjectMeta meta = new ObjectMeta();
+ meta.setGeneration(0L);
+ meta.setLabels(Map.of(Constants.LABEL_SENTINEL_RESOURCE, "true"));
+ meta.setName("sentinel");
+ meta.setNamespace(namespace);
+ cr.setMetadata(meta);
+ return cr;
+ }
+
+ public static void cleanPropertiesFile(String filePath) {
+ File myObj = new File(filePath);
+ if (!myObj.delete()) {
+ throw new RuntimeException("Failed to clean properties file: " +
filePath);
+ }
+ }
+
+ public static boolean notTimedOut(long startTime, long maxWaitTimeInMills) {
+ long elapsedTimeInMills = calculateElapsedTimeInMills(startTime);
+ return elapsedTimeInMills < maxWaitTimeInMills;
+ }
+
+ public static long calculateElapsedTimeInMills(long startTime) {
+ return System.currentTimeMillis() - startTime;
+ }
+}
diff --git a/spark-operator/src/test/resources/log4j2.properties
b/spark-operator/src/test/resources/log4j2.properties
new file mode 100644
index 0000000..53b4e9a
--- /dev/null
+++ b/spark-operator/src/test/resources/log4j2.properties
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+status=info
+strict=true
+dest=out
+name=PropertiesConfig
+property.filename=/tmp/spark-operator
+filter.threshold.type=ThresholdFilter
+filter.threshold.level=warn
+# console
+appender.console.type=Console
+appender.console.name=STDOUT
+appender.console.layout.type=PatternLayout
+appender.console.layout.pattern=%d %p %X %C{1.} [%t] %m%n
+appender.console.filter.threshold.type=ThresholdFilter
+appender.console.filter.threshold.level=info
+# rolling JSON
+appender.rolling.type=RollingFile
+appender.rolling.name=RollingFile
+appender.rolling.append=true
+appender.rolling.fileName=${filename}.log
+appender.rolling.filePattern=${filename}-%i.log.gz
+appender.rolling.layout.type=JsonTemplateLayout
+appender.rolling.layout.eventTemplateUri=classpath:EcsLayout.json
+appender.rolling.policies.type=Policies
+appender.rolling.policies.size.type=SizeBasedTriggeringPolicy
+appender.rolling.policies.size.size=100MB
+appender.rolling.strategy.type=DefaultRolloverStrategy
+appender.rolling.strategy.max=20
+appender.rolling.immediateFlush=true
+# chatty loggers
+rootLogger.level=all
+logger.netty.name=io.netty
+logger.netty.level=warn
+log4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector
+rootLogger.appenderRef.stdout.ref=STDOUT
+rootLogger.appenderRef.rolling.ref=RollingFile
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]