This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 61d013d  [SPARK-48984] Add Controller Metrics System and Utils
61d013d is described below

commit 61d013d7cc2510c8169fccca30cb7103b75783ef
Author: zhou-jiang <[email protected]>
AuthorDate: Wed Jul 24 22:46:23 2024 -0700

    [SPARK-48984] Add Controller Metrics System and Utils
    
    ### What changes were proposed in this pull request?
    
    This is a breakdown PR of #12  - defines metrics system and utils classes 
to be used by the reconcilers.
    
    It also refactors a few methods in previous utils class 
`org.apache.spark.k8s.operator.reconciler.SparkReconcilerUtils` into common 
utils package.
    
    ### Why are the changes needed?
    
    Breakdown PRs help us to move with more flexibility.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    CIs
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #23 from jiangzho/controller_utils.
    
    Authored-by: zhou-jiang <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 spark-operator/build.gradle                        |  20 +
 .../operator/client/KubernetesClientFactory.java   |  51 +++
 .../k8s/operator/config/SparkOperatorConf.java     | 467 +++++++++++++++++++++
 .../k8s/operator/listeners/BaseStatusListener.java |  28 ++
 .../operator/listeners/SparkAppStatusListener.java |  26 ++
 .../spark/k8s/operator/metrics/JVMMetricSet.java   |  74 ++++
 .../spark/k8s/operator/metrics/MetricsService.java |  57 +++
 .../spark/k8s/operator/metrics/MetricsSystem.java  | 146 +++++++
 .../k8s/operator/metrics/MetricsSystemFactory.java | 105 +++++
 .../metrics/PrometheusPullModelHandler.java        |  78 ++++
 .../source/KubernetesMetricsInterceptor.java       | 172 ++++++++
 .../metrics/source/OperatorJosdkMetrics.java       | 312 ++++++++++++++
 .../operator/metrics/source/OperatorJvmSource.java |  39 ++
 .../k8s/operator/utils/ClassLoadingUtils.java      |  68 +++
 .../spark/k8s/operator/utils/LoggingUtils.java     |  64 +++
 .../apache/spark/k8s/operator/utils/PodPhase.java  |  58 +++
 .../apache/spark/k8s/operator/utils/PodUtils.java  |  98 +++++
 .../apache/spark/k8s/operator/utils/ProbeUtil.java |  61 +++
 .../k8s/operator/utils/SparkAppStatusUtils.java    |  73 ++++
 .../k8s/operator/utils/SparkExceptionUtils.java    |  38 ++
 .../org/apache/spark/k8s/operator/utils/Utils.java | 113 +++++
 spark-operator/src/main/resources/EcsLayout.json   |  49 +++
 .../operator/metrics/MetricsSystemFactoryTest.java |  52 +++
 .../k8s/operator/metrics/MetricsSystemTest.java    |  77 ++++
 .../spark/k8s/operator/metrics/sink/MockSink.java  |  69 +++
 .../source/KubernetesMetricsInterceptorTest.java   | 153 +++++++
 .../metrics/source/OperatorJosdkMetricsTest.java   | 204 +++++++++
 .../apache/spark/k8s/operator/utils/TestUtils.java |  63 +++
 .../src/test/resources/log4j2.properties           |  52 +++
 29 files changed, 2867 insertions(+)

diff --git a/spark-operator/build.gradle b/spark-operator/build.gradle
index 8996708..6a733c6 100644
--- a/spark-operator/build.gradle
+++ b/spark-operator/build.gradle
@@ -21,6 +21,15 @@ dependencies {
   implementation("org.apache.logging.log4j:log4j-1.2-api:$log4jVersion")
   
implementation("org.apache.logging.log4j:log4j-layout-template-json:$log4jVersion")
 
+  // metrics
+  
implementation("io.dropwizard.metrics:metrics-core:$dropwizardMetricsVersion")
+  implementation("io.dropwizard.metrics:metrics-jvm:$dropwizardMetricsVersion")
+  compileOnly("org.apache.spark:spark-core_$scalaVersion:$sparkVersion") {
+    exclude group: 'com.squareup.okio'
+    exclude group: 'com.squareup.okhttp3'
+    exclude group: "org.apache.logging.log4j"
+    exclude group: "org.slf4j"
+  }
   compileOnly("org.projectlombok:lombok:$lombokVersion")
 
   annotationProcessor("org.projectlombok:lombok:$lombokVersion")
@@ -30,6 +39,17 @@ dependencies {
     exclude group: 'com.squareup.okio'
     exclude group: 'io.fabric8'
   }
+  testImplementation("io.fabric8:kubernetes-server-mock:$fabric8Version") {
+    exclude group: 'junit'
+    exclude group: 'com.squareup.okhttp3'
+  }
+  
testImplementation("org.apache.spark:spark-core_$scalaVersion:$sparkVersion") {
+    exclude group: 'com.squareup.okio'
+    exclude group: 'com.squareup.okhttp3'
+    exclude group: "org.apache.logging.log4j"
+    exclude group: "org.slf4j"
+  }
+  testImplementation("com.squareup.okhttp3:mockwebserver:$okHttpVersion")
   testImplementation platform("org.junit:junit-bom:$junitVersion")
   testImplementation("org.junit.jupiter:junit-jupiter:$junitVersion")
   testRuntimeOnly("org.junit.platform:junit-platform-launcher")
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/client/KubernetesClientFactory.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/client/KubernetesClientFactory.java
new file mode 100644
index 0000000..6d3e213
--- /dev/null
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/client/KubernetesClientFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.spark.k8s.operator.client;
+
+import java.util.List;
+
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientBuilder;
+import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory;
+import okhttp3.Interceptor;
+import okhttp3.OkHttpClient;
+
+/** Build Kubernetes Client with metrics configured */
+public class KubernetesClientFactory {
+  public static KubernetesClient buildKubernetesClient(final List<Interceptor> 
interceptors) {
+    return buildKubernetesClient(interceptors, null);
+  }
+
+  public static KubernetesClient buildKubernetesClient(
+      final List<Interceptor> interceptors, final Config 
kubernetesClientConfig) {
+    return new KubernetesClientBuilder()
+        .withConfig(kubernetesClientConfig)
+        .withHttpClientFactory(
+            new OkHttpClientFactory() {
+              @Override
+              protected void additionalConfig(OkHttpClient.Builder builder) {
+                for (Interceptor interceptor : interceptors) {
+                  builder.addInterceptor(interceptor);
+                }
+              }
+            })
+        .build();
+  }
+}
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
new file mode 100644
index 0000000..a3f791a
--- /dev/null
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.config;
+
+import java.time.Duration;
+
+import io.javaoperatorsdk.operator.api.config.LeaderElectionConfiguration;
+import io.javaoperatorsdk.operator.processing.event.rate.LinearRateLimiter;
+import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
+import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.spark.k8s.operator.utils.Utils;
+
+/** Spark Operator Configuration options. */
+@Slf4j
+public class SparkOperatorConf {
+  public static final ConfigOption<String> OPERATOR_APP_NAME =
+      ConfigOption.<String>builder()
+          .key("spark.kubernetes.operator.name")
+          .enableDynamicOverride(false)
+          .description("Name of the operator.")
+          .typeParameterClass(String.class)
+          .defaultValue("spark-kubernetes-operator")
+          .build();
+
+  public static final ConfigOption<String> OPERATOR_NAMESPACE =
+      ConfigOption.<String>builder()
+          .key("spark.kubernetes.operator.namespace")
+          .enableDynamicOverride(false)
+          .description("Namespace that operator is deployed within.")
+          .typeParameterClass(String.class)
+          .defaultValue("default")
+          .build();
+
+  public static final ConfigOption<String> OPERATOR_WATCHED_NAMESPACES =
+      ConfigOption.<String>builder()
+          .key("spark.kubernetes.operator.watchedNamespaces")
+          .enableDynamicOverride(true)
+          .description(
+              "Comma-separated list of namespaces that the operator would be 
watching for "
+                  + "Spark resources. If set to '*', operator would watch all 
namespaces.")
+          .typeParameterClass(String.class)
+          .defaultValue("default")
+          .build();
+
+  public static final ConfigOption<Boolean> 
TERMINATE_ON_INFORMER_FAILURE_ENABLED =
+      ConfigOption.<Boolean>builder()
+          .key("spark.kubernetes.operator.terminateOnInformerFailureEnabled")
+          .enableDynamicOverride(false)
+          .description(
+              "Enable to indicate informer errors should stop operator 
startup. If "
+                  + "disabled, operator startup will ignore recoverable 
errors, "
+                  + "caused for example by RBAC issues and will retry "
+                  + "periodically.")
+          .typeParameterClass(Boolean.class)
+          .defaultValue(false)
+          .build();
+
+  public static final ConfigOption<Integer> 
RECONCILER_TERMINATION_TIMEOUT_SECONDS =
+      ConfigOption.<Integer>builder()
+          
.key("spark.kubernetes.operator.reconciler.terminationTimeoutSeconds")
+          .enableDynamicOverride(false)
+          .description(
+              "Grace period for operator shutdown before reconciliation 
threads are killed.")
+          .typeParameterClass(Integer.class)
+          .defaultValue(30)
+          .build();
+
+  public static final ConfigOption<Integer> RECONCILER_PARALLELISM =
+      ConfigOption.<Integer>builder()
+          .key("spark.kubernetes.operator.reconciler.parallelism")
+          .enableDynamicOverride(false)
+          .description(
+              "Thread pool size for Spark Operator reconcilers. Unbounded pool 
would be used if "
+                  + "set to non-positive number.")
+          .typeParameterClass(Integer.class)
+          .defaultValue(50)
+          .build();
+
+  public static final ConfigOption<Long> 
RECONCILER_FOREGROUND_REQUEST_TIMEOUT_SECONDS =
+      ConfigOption.<Long>builder()
+          
.key("spark.kubernetes.operator.reconciler.foregroundRequestTimeoutSeconds")
+          .enableDynamicOverride(true)
+          .description(
+              "Timeout (in seconds) to for requests made to API server. This "
+                  + "applies only to foreground requests.")
+          .typeParameterClass(Long.class)
+          .defaultValue(30L)
+          .build();
+
+  public static final ConfigOption<Long> RECONCILER_INTERVAL_SECONDS =
+      ConfigOption.<Long>builder()
+          .key("spark.kubernetes.operator.reconciler.intervalSeconds")
+          .enableDynamicOverride(true)
+          .description(
+              "Interval (in seconds, non-negative) to reconcile Spark 
applications. Note that "
+                  + "reconciliation is always expected to be triggered when 
app spec / status is "
+                  + "updated. This interval controls the reconcile behavior of 
operator "
+                  + "reconciliation even when there's no update on 
SparkApplication, e.g. to "
+                  + "determine whether a hanging app needs to be proactively 
terminated. Thus "
+                  + "this is recommended to set to above 2 minutes to avoid 
unnecessary no-op "
+                  + "reconciliation.")
+          .typeParameterClass(Long.class)
+          .defaultValue(120L)
+          .build();
+
+  public static final ConfigOption<Boolean> 
TRIM_ATTEMPT_STATE_TRANSITION_HISTORY =
+      ConfigOption.<Boolean>builder()
+          
.key("spark.kubernetes.operator.reconciler.trimStateTransitionHistoryEnabled")
+          .enableDynamicOverride(true)
+          .description(
+              "When enabled, operator would trim state transition history when 
a "
+                  + "new attempt starts, keeping previous attempt summary 
only.")
+          .typeParameterClass(Boolean.class)
+          .defaultValue(true)
+          .build();
+
+  public static final ConfigOption<String> 
SPARK_APP_STATUS_LISTENER_CLASS_NAMES =
+      ConfigOption.<String>builder()
+          
.key("spark.kubernetes.operator.reconciler.appStatusListenerClassNames")
+          .enableDynamicOverride(false)
+          .description("Comma-separated names of SparkAppStatusListener class 
implementations")
+          .typeParameterClass(String.class)
+          .defaultValue("")
+          .build();
+
+  public static final ConfigOption<Boolean> DYNAMIC_CONFIG_ENABLED =
+      ConfigOption.<Boolean>builder()
+          .key("spark.kubernetes.operator.dynamicConfig.enabled")
+          .enableDynamicOverride(false)
+          .description(
+              "When enabled, operator would use config map as source of truth 
for config "
+                  + "property override. The config map need to be created in "
+                  + "spark.kubernetes.operator.namespace, and labeled with 
operator name.")
+          .typeParameterClass(Boolean.class)
+          .defaultValue(false)
+          .build();
+
+  public static final ConfigOption<String> DYNAMIC_CONFIG_SELECTOR =
+      ConfigOption.<String>builder()
+          .key("spark.kubernetes.operator.dynamicConfig.selector")
+          .enableDynamicOverride(false)
+          .description("The selector str applied to dynamic config map.")
+          .typeParameterClass(String.class)
+          .defaultValue(Utils.labelsAsStr(Utils.defaultOperatorConfigLabels()))
+          .build();
+
+  public static final ConfigOption<Integer> 
DYNAMIC_CONFIG_RECONCILER_PARALLELISM =
+      ConfigOption.<Integer>builder()
+          .key("spark.kubernetes.operator.dynamicConfig.reconcilerParallelism")
+          .enableDynamicOverride(false)
+          .description(
+              "Parallelism for dynamic config reconciler. Unbounded pool would 
be used "
+                  + "if set to non-positive number.")
+          .typeParameterClass(Integer.class)
+          .defaultValue(1)
+          .build();
+
+  public static final ConfigOption<Integer> 
RECONCILER_RATE_LIMITER_REFRESH_PERIOD_SECONDS =
+      ConfigOption.<Integer>builder()
+          
.key("spark.kubernetes.operator.reconciler.rateLimiter.refreshPeriodSeconds")
+          .enableDynamicOverride(false)
+          .description("Operator rate limiter refresh period(in seconds) for 
each resource.")
+          .typeParameterClass(Integer.class)
+          .defaultValue(15)
+          .build();
+
+  public static final ConfigOption<Integer> 
RECONCILER_RATE_LIMITER_MAX_LOOP_FOR_PERIOD =
+      ConfigOption.<Integer>builder()
+          
.key("spark.kubernetes.operator.reconciler.rateLimiter.maxLoopForPeriod")
+          .enableDynamicOverride(false)
+          .description(
+              "Max number of reconcile loops triggered within the rate limiter 
refresh "
+                  + "period for each resource. Setting the limit <= 0 disables 
the "
+                  + "limiter.")
+          .typeParameterClass(Integer.class)
+          .defaultValue(5)
+          .build();
+
+  public static final ConfigOption<Integer> 
RECONCILER_RETRY_INITIAL_INTERVAL_SECONDS =
+      ConfigOption.<Integer>builder()
+          
.key("spark.kubernetes.operator.reconciler.retry.initialIntervalSeconds")
+          .enableDynamicOverride(false)
+          .description("Initial interval(in seconds) of retries on unhandled 
controller errors.")
+          .typeParameterClass(Integer.class)
+          .defaultValue(5)
+          .build();
+
+  public static final ConfigOption<Double> 
RECONCILER_RETRY_INTERVAL_MULTIPLIER =
+      ConfigOption.<Double>builder()
+          .key("spark.kubernetes.operator.reconciler.retry.intervalMultiplier")
+          .enableDynamicOverride(false)
+          .description(
+              "Interval multiplier of retries on unhandled controller errors. 
Setting "
+                  + "this to 1 for linear retry.")
+          .typeParameterClass(Double.class)
+          .defaultValue(1.5)
+          .build();
+
+  public static final ConfigOption<Integer> 
RECONCILER_RETRY_MAX_INTERVAL_SECONDS =
+      ConfigOption.<Integer>builder()
+          .key("spark.kubernetes.operator.reconciler.retry.maxIntervalSeconds")
+          .enableDynamicOverride(false)
+          .description(
+              "Max interval(in seconds) of retries on unhandled controller 
errors. "
+                  + "Set to non-positive for unlimited.")
+          .typeParameterClass(Integer.class)
+          .defaultValue(-1)
+          .build();
+
+  public static final ConfigOption<Integer> API_RETRY_MAX_ATTEMPTS =
+      ConfigOption.<Integer>builder()
+          .key("spark.kubernetes.operator.api.retryMaxAttempts")
+          .enableDynamicOverride(false)
+          .description(
+              "Max attempts of retries on unhandled controller errors. Setting 
this to "
+                  + "non-positive value means no retry.")
+          .typeParameterClass(Integer.class)
+          .defaultValue(15)
+          .build();
+
+  public static final ConfigOption<Long> API_RETRY_ATTEMPT_AFTER_SECONDS =
+      ConfigOption.<Long>builder()
+          .key("spark.kubernetes.operator.api.retryAttemptAfterSeconds")
+          .enableDynamicOverride(false)
+          .description(
+              "Default time (in seconds) to wait till next request. This would 
be used if "
+                  + "server does not set Retry-After in response. Setting this 
to non-positive "
+                  + "number means immediate retry.")
+          .typeParameterClass(Long.class)
+          .defaultValue(1L)
+          .build();
+
+  public static final ConfigOption<Long> API_STATUS_PATCH_MAX_ATTEMPTS =
+      ConfigOption.<Long>builder()
+          .key("spark.kubernetes.operator.api.statusPatchMaxAttempts")
+          .enableDynamicOverride(false)
+          .description(
+              "Maximal number of retry attempts of requests to k8s server for 
resource "
+                  + "status update. This would be performed on top of k8s 
client "
+                  + "spark.kubernetes.operator.retry.maxAttempts to overcome 
potential "
+                  + "conflicting update on the same SparkApplication. This 
should be positive "
+                  + "number.")
+          .typeParameterClass(Long.class)
+          .defaultValue(3L)
+          .build();
+
+  public static final ConfigOption<Long> 
API_SECONDARY_RESOURCE_CREATE_MAX_ATTEMPTS =
+      ConfigOption.<Long>builder()
+          
.key("spark.kubernetes.operator.api.secondaryResourceCreateMaxAttempts")
+          .enableDynamicOverride(false)
+          .description(
+              "Maximal number of retry attempts of requesting secondary 
resource for Spark "
+                  + "application. This would be performed on top of k8s client 
"
+                  + "spark.kubernetes.operator.retry.maxAttempts to overcome 
potential "
+                  + "conflicting reconcile on the same SparkApplication. This 
should be "
+                  + "positive number")
+          .typeParameterClass(Long.class)
+          .defaultValue(3L)
+          .build();
+
+  public static final ConfigOption<Boolean> JOSDK_METRICS_ENABLED =
+      ConfigOption.<Boolean>builder()
+          .key("spark.kubernetes.operator.metrics.josdkMetricsEnabled")
+          .enableDynamicOverride(false)
+          .description(
+              "When enabled, the josdk metrics will be added in metrics source 
and "
+                  + "configured for operator.")
+          .typeParameterClass(Boolean.class)
+          .defaultValue(true)
+          .build();
+
+  public static final ConfigOption<Boolean> KUBERNETES_CLIENT_METRICS_ENABLED =
+      ConfigOption.<Boolean>builder()
+          .key("spark.kubernetes.operator.metrics.clientMetricsEnabled")
+          .enableDynamicOverride(false)
+          .description(
+              "Enable KubernetesClient metrics for measuring the HTTP traffic 
to "
+                  + "the Kubernetes API Server. Since the metrics is collected 
"
+                  + "via Okhttp interceptors, can be disabled when opt in "
+                  + "customized interceptors.")
+          .typeParameterClass(Boolean.class)
+          .defaultValue(true)
+          .build();
+
+  public static final ConfigOption<Boolean>
+      KUBERNETES_CLIENT_METRICS_GROUP_BY_RESPONSE_CODE_GROUP_ENABLED =
+          ConfigOption.<Boolean>builder()
+              
.key("spark.kubernetes.operator.metrics.clientMetricsGroupByResponseCodeEnabled")
+              .enableDynamicOverride(false)
+              .description(
+                  "When enabled, additional metrics group by http response 
code group(1xx, "
+                      + "2xx, 3xx, 4xx, 5xx) received from API server will be 
added. Users "
+                      + "can disable it when their monitoring system can 
combine lower level "
+                      + 
"kubernetes.client.http.response.<3-digit-response-code> metrics.")
+              .typeParameterClass(Boolean.class)
+              .defaultValue(true)
+              .build();
+
+  public static final ConfigOption<Integer> OPERATOR_METRICS_PORT =
+      ConfigOption.<Integer>builder()
+          .key("spark.kubernetes.operator.metrics.port")
+          .enableDynamicOverride(false)
+          .description("The port used for checking metrics")
+          .typeParameterClass(Integer.class)
+          .defaultValue(19090)
+          .build();
+
+  public static final ConfigOption<Integer> OPERATOR_PROBE_PORT =
+      ConfigOption.<Integer>builder()
+          .key("spark.kubernetes.operator.health.probePort")
+          .enableDynamicOverride(false)
+          .description("The port used for health/readiness check probe 
status.")
+          .typeParameterClass(Integer.class)
+          .defaultValue(19091)
+          .build();
+
+  public static final ConfigOption<Integer> 
SENTINEL_EXECUTOR_SERVICE_POOL_SIZE =
+      ConfigOption.<Integer>builder()
+          .key("spark.kubernetes.operator.health.sentinelExecutorPoolSize")
+          .typeParameterClass(Integer.class)
+          .description(
+              "Size of executor service in Sentinel Managers to check the 
health "
+                  + "of sentinel resources.")
+          .defaultValue(3)
+          .enableDynamicOverride(false)
+          .build();
+
+  public static final ConfigOption<Integer> 
SENTINEL_RESOURCE_RECONCILIATION_DELAY =
+      ConfigOption.<Integer>builder()
+          
.key("spark.kubernetes.operator.health.sentinelResourceReconciliationDelaySeconds")
+          .enableDynamicOverride(true)
+          .description(
+              "Allowed max time(seconds) between spec update and 
reconciliation "
+                  + "for sentinel resources.")
+          .typeParameterClass(Integer.class)
+          .defaultValue(60)
+          .build();
+
+  public static final ConfigOption<Boolean> LEADER_ELECTION_ENABLED =
+      ConfigOption.<Boolean>builder()
+          .key("spark.kubernetes.operator.leaderElection.enabled")
+          .enableDynamicOverride(false)
+          .description(
+              "Enable leader election for the operator to allow running 
standby instances. When "
+                  + "this is disabled, only one operator instance is expected 
to be up and "
+                  + "running at any time (replica = 1) to avoid race 
condition.")
+          .typeParameterClass(Boolean.class)
+          .defaultValue(false)
+          .build();
+
+  public static final ConfigOption<String> LEADER_ELECTION_LEASE_NAME =
+      ConfigOption.<String>builder()
+          .key("spark.kubernetes.operator.leaderElection.leaseName")
+          .enableDynamicOverride(false)
+          .description(
+              "Leader election lease name, must be unique for leases in the 
same namespace.")
+          .typeParameterClass(String.class)
+          .defaultValue("spark-operator-lease")
+          .build();
+
+  public static final ConfigOption<Integer> 
LEADER_ELECTION_LEASE_DURATION_SECONDS =
+      ConfigOption.<Integer>builder()
+          .key("spark.kubernetes.operator.leaderElection.leaseDurationSeconds")
+          .enableDynamicOverride(false)
+          .description("Leader election lease duration in seconds, 
non-negative.")
+          .typeParameterClass(Integer.class)
+          .defaultValue(180)
+          .build();
+
+  public static final ConfigOption<Integer> 
LEADER_ELECTION_RENEW_DEADLINE_SECONDS =
+      ConfigOption.<Integer>builder()
+          .key("spark.kubernetes.operator.leaderElection.renewDeadlineSeconds")
+          .enableDynamicOverride(false)
+          .description(
+              "Leader election renew deadline in seconds, non-negative. This 
needs to be "
+                  + "smaller than the lease duration to allow current leader 
renew the lease "
+                  + "before lease expires.")
+          .typeParameterClass(Integer.class)
+          .defaultValue(120)
+          .build();
+
+  public static final ConfigOption<Integer> 
LEADER_ELECTION_RETRY_PERIOD_SECONDS =
+      ConfigOption.<Integer>builder()
+          .key("spark.kubernetes.operator.leaderElection.retryPeriodSeconds")
+          .enableDynamicOverride(false)
+          .description("Leader election retry period in seconds, 
non-negative.")
+          .typeParameterClass(Integer.class)
+          .defaultValue(5)
+          .build();
+
+  public static LeaderElectionConfiguration getLeaderElectionConfig() {
+    return new LeaderElectionConfiguration(
+        LEADER_ELECTION_LEASE_NAME.getValue(),
+        OPERATOR_NAMESPACE.getValue(),
+        
Duration.ofSeconds(ensurePositiveIntFor(LEADER_ELECTION_LEASE_DURATION_SECONDS)),
+        
Duration.ofSeconds(ensurePositiveIntFor(LEADER_ELECTION_RENEW_DEADLINE_SECONDS)),
+        
Duration.ofSeconds(ensurePositiveIntFor(LEADER_ELECTION_RETRY_PERIOD_SECONDS)));
+  }
+
+  public static GenericRetry getOperatorRetry() {
+    GenericRetry genericRetry =
+        new GenericRetry()
+            .setMaxAttempts(ensureNonNegativeIntFor(API_RETRY_MAX_ATTEMPTS))
+            .setInitialInterval(
+                Duration.ofSeconds(
+                        
ensureNonNegativeIntFor(RECONCILER_RETRY_INITIAL_INTERVAL_SECONDS))
+                    .toMillis())
+            
.setIntervalMultiplier(RECONCILER_RETRY_INTERVAL_MULTIPLIER.getValue());
+    if (RECONCILER_RETRY_MAX_INTERVAL_SECONDS.getValue() > 0) {
+      genericRetry.setMaxInterval(
+          
Duration.ofSeconds(RECONCILER_RETRY_MAX_INTERVAL_SECONDS.getValue()).toMillis());
+    } else {
+      log.info("Reconciler retry policy is configured with unlimited max 
attempts");
+    }
+    return genericRetry;
+  }
+
+  public static RateLimiter<?> getOperatorRateLimiter() {
+    return new LinearRateLimiter(
+        
Duration.ofSeconds(ensureNonNegativeIntFor(RECONCILER_RATE_LIMITER_REFRESH_PERIOD_SECONDS)),
+        ensureNonNegativeIntFor(RECONCILER_RATE_LIMITER_MAX_LOOP_FOR_PERIOD));
+  }
+
+  private static int ensureNonNegativeIntFor(ConfigOption<Integer> 
configOption) {
+    return ensureValid(configOption.getValue(), configOption.getDescription(), 
0, 0);
+  }
+
+  private static int ensurePositiveIntFor(ConfigOption<Integer> configOption) {
+    return ensureValid(configOption.getValue(), configOption.getDescription(), 
1, 1);
+  }
+
+  private static int ensureValid(int value, String description, int minValue, 
int defaultValue) {
+    if (value < minValue) {
+      if (defaultValue < minValue) {
+        throw new IllegalArgumentException(
+            "Default value for " + description + " must be greater than " + 
minValue);
+      }
+      log.warn(
+          "Requested {} should be greater than {}. Requested: {}, using {} 
(default) instead",
+          description,
+          minValue,
+          value,
+          defaultValue);
+      return defaultValue;
+    }
+    return value;
+  }
+}
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/listeners/BaseStatusListener.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/listeners/BaseStatusListener.java
new file mode 100644
index 0000000..17076b8
--- /dev/null
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/listeners/BaseStatusListener.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.spark.k8s.operator.listeners;
+
+import org.apache.spark.k8s.operator.BaseResource;
+import org.apache.spark.k8s.operator.status.BaseStatus;
+
+/** Custom listeners, if added, would be listening to resource status change */
+public abstract class BaseStatusListener<
+    STATUS extends BaseStatus<?, ?, ?>, CR extends BaseResource<?, ?, ?, ?, 
STATUS>> {
+  public abstract void listenStatus(CR resource, STATUS prevStatus, STATUS 
updatedStatus);
+}
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/listeners/SparkAppStatusListener.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/listeners/SparkAppStatusListener.java
new file mode 100644
index 0000000..018e059
--- /dev/null
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/listeners/SparkAppStatusListener.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.spark.k8s.operator.listeners;
+
+import org.apache.spark.k8s.operator.SparkApplication;
+import org.apache.spark.k8s.operator.status.ApplicationStatus;
+
+/** Custom listeners, if added, would be listening to Spark App status change 
*/
+public abstract class SparkAppStatusListener
+    extends BaseStatusListener<ApplicationStatus, SparkApplication> {}
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/JVMMetricSet.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/JVMMetricSet.java
new file mode 100644
index 0000000..a87d5bd
--- /dev/null
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/JVMMetricSet.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics;
+
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
+import com.codahale.metrics.jvm.BufferPoolMetricSet;
+import com.codahale.metrics.jvm.FileDescriptorRatioGauge;
+import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
+
+public class JVMMetricSet implements MetricSet {
+
+  /** Refer codahale FileDescriptorRatioGauge for the definition */
+  public static final String FILE_DESC_RATIO_NAME = "fileDesc.ratio.open/max";
+
+  public static final String PREFIX_BUFFER_POOL = "bufferPool";
+  public static final String PREFIX_GC = "gc";
+  public static final String PREFIX_MEMORY_USAGE = "memoryUsage";
+  public static final String PREFIX_THREADS_STATES = "threadStates";
+  private final BufferPoolMetricSet bufferPoolMetricSet;
+  private final FileDescriptorRatioGauge fileDescriptorRatioGauge;
+  private final GarbageCollectorMetricSet garbageCollectorMetricSet;
+  private final MemoryUsageGaugeSet memoryUsageGaugeSet;
+  private final ThreadStatesGaugeSet threadStatesGaugeSet;
+
+  public JVMMetricSet() {
+    bufferPoolMetricSet = new 
BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer());
+    fileDescriptorRatioGauge = new FileDescriptorRatioGauge();
+    garbageCollectorMetricSet = new GarbageCollectorMetricSet();
+    memoryUsageGaugeSet = new MemoryUsageGaugeSet();
+    threadStatesGaugeSet = new ThreadStatesGaugeSet();
+  }
+
+  @Override
+  public Map<String, Metric> getMetrics() {
+    final Map<String, Metric> jvmMetrics = new HashMap<>();
+    putAllMetrics(jvmMetrics, bufferPoolMetricSet, PREFIX_BUFFER_POOL);
+    jvmMetrics.put(FILE_DESC_RATIO_NAME, fileDescriptorRatioGauge);
+    putAllMetrics(jvmMetrics, garbageCollectorMetricSet, PREFIX_GC);
+    putAllMetrics(jvmMetrics, memoryUsageGaugeSet, PREFIX_MEMORY_USAGE);
+    putAllMetrics(jvmMetrics, threadStatesGaugeSet, PREFIX_THREADS_STATES);
+    return jvmMetrics;
+  }
+
+  private void putAllMetrics(
+      final Map<String, Metric> destination, final MetricSet origin, final 
String prefix) {
+    for (Map.Entry<String, Metric> entry : origin.getMetrics().entrySet()) {
+      destination.put(prefix + "." + entry.getKey(), entry.getValue());
+    }
+  }
+}
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/MetricsService.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/MetricsService.java
new file mode 100644
index 0000000..a4af354
--- /dev/null
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/MetricsService.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics;
+
+import static 
org.apache.spark.k8s.operator.config.SparkOperatorConf.OPERATOR_METRICS_PORT;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executor;
+
+import com.sun.net.httpserver.HttpServer;
+import lombok.extern.slf4j.Slf4j;
+
+/** Start Http service at endpoint /prometheus, exposing operator metrics */
+@Slf4j
+public class MetricsService {
+  HttpServer server;
+  MetricsSystem metricsSystem;
+
+  public MetricsService(MetricsSystem metricsSystem, Executor executor) {
+    this.metricsSystem = metricsSystem;
+    try {
+      server = HttpServer.create(new 
InetSocketAddress(OPERATOR_METRICS_PORT.getValue()), 0);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to create Metrics Server", e);
+    }
+    server.setExecutor(executor);
+  }
+
+  public void start() {
+    log.info("Starting Metrics Service for Prometheus ...");
+    server.createContext("/prometheus", 
metricsSystem.getPrometheusPullModelHandler());
+    server.start();
+  }
+
+  public void stop() {
+    log.info("Metrics Service stopped");
+    server.stop(0);
+  }
+}
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/MetricsSystem.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/MetricsSystem.java
new file mode 100644
index 0000000..5dcd61b
--- /dev/null
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/MetricsSystem.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.spark.k8s.operator.metrics.source.OperatorJvmSource;
+import org.apache.spark.metrics.sink.Sink;
+import org.apache.spark.metrics.source.Source;
+
+@Slf4j
+public class MetricsSystem {
+  private final AtomicBoolean running = new AtomicBoolean(false);
+  @Getter private final Set<Sink> sinks;
+  @Getter private final Set<Source> sources;
+  @Getter private final MetricRegistry registry;
+  @Getter private final Properties properties;
+  // PrometheusPullModelHandler is registered by default, metrics exposed via 
http port
+  @Getter private final PrometheusPullModelHandler prometheusPullModelHandler;
+  private final Map<String, SinkProperties> sinkPropertiesMap;
+
+  public MetricsSystem() {
+    this(new Properties());
+  }
+
+  public MetricsSystem(Properties properties) {
+    this.sources = new HashSet<>();
+    this.sinks = new HashSet<>();
+    this.registry = new MetricRegistry();
+    this.properties = properties;
+    this.sinkPropertiesMap = 
MetricsSystemFactory.parseSinkProperties(this.properties);
+    // Add default sinks
+    this.prometheusPullModelHandler = new PrometheusPullModelHandler(new 
Properties(), registry);
+    this.sinks.add(prometheusPullModelHandler);
+  }
+
+  public void start() {
+    if (running.get()) {
+      throw new IllegalStateException(
+          "Attempting to start a MetricsSystem that is already running");
+    }
+    running.set(true);
+    registerDefaultSources();
+    registerSinks();
+    sinks.forEach(Sink::start);
+  }
+
+  public void stop() {
+    if (running.get()) {
+      sinks.forEach(Sink::stop);
+      registry.removeMatching(MetricFilter.ALL);
+    } else {
+      log.error("Stopping a MetricsSystem that is not running");
+    }
+    running.set(false);
+  }
+
+  public void report() {
+    sinks.forEach(Sink::report);
+  }
+
+  protected void registerDefaultSources() {
+    registerSource(new OperatorJvmSource());
+  }
+
+  protected void registerSinks() {
+    log.info("sinkPropertiesMap: {}", sinkPropertiesMap);
+    sinkPropertiesMap
+        .values()
+        .forEach(
+            sinkProp -> {
+              try {
+                Class<Sink> sinkClass = (Class<Sink>) 
Class.forName(sinkProp.getClassName());
+                Sink sinkInstance;
+                sinkInstance =
+                    sinkClass
+                        .getConstructor(Properties.class, MetricRegistry.class)
+                        .newInstance(sinkProp.getProperties(), registry);
+                sinks.add(sinkInstance);
+              } catch (InstantiationException
+                  | IllegalAccessException
+                  | IllegalArgumentException
+                  | InvocationTargetException
+                  | NoSuchMethodException
+                  | SecurityException
+                  | ClassNotFoundException e) {
+                if (log.isErrorEnabled()) {
+                  log.error(
+                      "Fail to create metrics sink for sink name {}, sink 
properties {}",
+                      sinkProp.getClassName(),
+                      sinkProp.getProperties());
+                }
+                throw new RuntimeException("Fail to create metrics sink", e);
+              }
+            });
+  }
+
+  public void registerSource(Source source) {
+    sources.add(source);
+    try {
+      String regName = MetricRegistry.name(source.sourceName());
+      registry.register(regName, source.metricRegistry());
+    } catch (IllegalArgumentException e) {
+      log.error("Metrics already registered", e);
+    }
+  }
+
+  @Data
+  public static class SinkProperties {
+    String className;
+    Properties properties;
+
+    public SinkProperties() {
+      this.className = "";
+      this.properties = new Properties();
+    }
+  }
+}
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/MetricsSystemFactory.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/MetricsSystemFactory.java
new file mode 100644
index 0000000..88d357c
--- /dev/null
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/MetricsSystemFactory.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics;
+
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.spark.k8s.operator.config.SparkOperatorConfManager;
+
+/** Factory for MetricsSystem */
+public class MetricsSystemFactory {
+  public static final String METRIC_PREFIX = "spark.metrics.conf.operator.";
+  public static final String SINK = "sink.";
+  public static final String CLASS = "class";
+
+  public static MetricsSystem createMetricsSystem() {
+    Properties properties =
+        
parseMetricsProperties(SparkOperatorConfManager.INSTANCE.getMetricsProperties());
+    return new MetricsSystem(properties);
+  }
+
+  public static MetricsSystem createMetricsSystem(Properties properties) {
+    return new MetricsSystem(properties);
+  }
+
+  private static Properties parseMetricsProperties(Properties userProperties) {
+    Properties properties = new Properties();
+    Enumeration<?> valueEnumeration = userProperties.propertyNames();
+    while (valueEnumeration.hasMoreElements()) {
+      String key = (String) valueEnumeration.nextElement();
+      if (key.startsWith(METRIC_PREFIX)) {
+        properties.put(key.substring(METRIC_PREFIX.length()), 
userProperties.getProperty(key));
+      }
+    }
+    return properties;
+  }
+
+  public static Map<String, MetricsSystem.SinkProperties> parseSinkProperties(
+      Properties metricsProperties) {
+    Map<String, MetricsSystem.SinkProperties> propertiesMap = new HashMap<>();
+    // e.g: "sink.graphite.class"="org.apache.spark.metrics.sink.ConsoleSink"
+    Enumeration<?> valueEnumeration = metricsProperties.propertyNames();
+    while (valueEnumeration.hasMoreElements()) {
+      String key = (String) valueEnumeration.nextElement();
+      int firstDotIndex = StringUtils.ordinalIndexOf(key, ".", 1);
+      int secondDotIndex = StringUtils.ordinalIndexOf(key, ".", 2);
+      if (key.startsWith(SINK)) {
+        String shortName = key.substring(firstDotIndex + 1, secondDotIndex);
+        MetricsSystem.SinkProperties sinkProperties =
+            propertiesMap.getOrDefault(shortName, new 
MetricsSystem.SinkProperties());
+        if (key.endsWith(CLASS)) {
+          sinkProperties.setClassName(metricsProperties.getProperty(key));
+        } else {
+          sinkProperties
+              .getProperties()
+              .put(key.substring(secondDotIndex + 1), 
metricsProperties.getProperty(key));
+        }
+        propertiesMap.put(shortName, sinkProperties);
+      }
+    }
+    sinkPropertiesSanityCheck(propertiesMap);
+    return propertiesMap;
+  }
+
+  private static void sinkPropertiesSanityCheck(
+      Map<String, MetricsSystem.SinkProperties> sinkPropsMap) {
+    for (Map.Entry<String, MetricsSystem.SinkProperties> pair : 
sinkPropsMap.entrySet()) {
+      // Each Sink should have mapping class full name
+      if (StringUtils.isBlank(pair.getValue().className)) {
+        String errorMessage =
+            String.format(
+                "%s provides properties, but does not provide full class 
name", pair.getKey());
+        throw new RuntimeException(errorMessage);
+      }
+      // Check the existence of each class full name
+      try {
+        Class.forName(pair.getValue().getClassName());
+      } catch (ClassNotFoundException e) {
+        throw new RuntimeException(
+            String.format("Fail to find class %s", 
pair.getValue().getClassName()), e);
+      }
+    }
+  }
+}
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/PrometheusPullModelHandler.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/PrometheusPullModelHandler.java
new file mode 100644
index 0000000..73bb49b
--- /dev/null
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/PrometheusPullModelHandler.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics;
+
+import static org.apache.spark.k8s.operator.utils.ProbeUtil.sendMessage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import com.codahale.metrics.MetricRegistry;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import jakarta.servlet.http.HttpServletRequest;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.spark.metrics.sink.PrometheusServlet;
+
+/** Serves as simple Prometheus sink (pull model), presenting metrics snapshot 
as HttpHandler */
+@Slf4j
+public class PrometheusPullModelHandler extends PrometheusServlet implements 
HttpHandler {
+  private static final String EMPTY_RECORD_VALUE = "[]";
+
+  public PrometheusPullModelHandler(Properties properties, MetricRegistry 
registry) {
+    super(properties, registry);
+  }
+
+  @Override
+  public void start() {
+    log.info("PrometheusPullModelHandler started");
+  }
+
+  @Override
+  public void stop() {
+    log.info("PrometheusPullModelHandler stopped");
+  }
+
+  @Override
+  public void handle(HttpExchange exchange) throws IOException {
+    HttpServletRequest httpServletRequest = null;
+    String value = getMetricsSnapshot(httpServletRequest);
+    sendMessage(exchange, 200, String.join("\n", 
filterNonEmptyRecords(value)));
+  }
+
+  protected List<String> filterNonEmptyRecords(String metricsSnapshot) {
+    // filter out empty records which could cause Prometheus invalid syntax 
exception, e.g:
+    // metrics_jvm_threadStates_deadlocks_Number{type="gauges"} []
+    // metrics_jvm_threadStates_deadlocks_Value{type="gauges"} []
+    String[] records = metricsSnapshot.split("\n");
+    List<String> filteredRecords = new ArrayList<>();
+    for (String record : records) {
+      String[] keyValuePair = record.split(" ");
+      if (EMPTY_RECORD_VALUE.equals(keyValuePair[1])) {
+        continue;
+      }
+      filteredRecords.add(record);
+    }
+    return filteredRecords;
+  }
+}
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/KubernetesMetricsInterceptor.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/KubernetesMetricsInterceptor.java
new file mode 100644
index 0000000..5684b62
--- /dev/null
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/KubernetesMetricsInterceptor.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics.source;
+
+import static 
org.apache.spark.k8s.operator.config.SparkOperatorConf.KUBERNETES_CLIENT_METRICS_GROUP_BY_RESPONSE_CODE_GROUP_ENABLED;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.Interceptor;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.apache.commons.lang3.tuple.Pair;
+import org.jetbrains.annotations.NotNull;
+
+import org.apache.spark.metrics.source.Source;
+
+@Slf4j
+public class KubernetesMetricsInterceptor implements Interceptor, Source {
+  MetricRegistry metricRegistry;
+  public static final String NAMESPACES = "namespaces";
+  public static final String HTTP_REQUEST_GROUP = "http.request";
+  public static final String HTTP_REQUEST_FAILED_GROUP = "failed";
+  public static final String HTTP_RESPONSE_GROUP = "http.response";
+  public static final String HTTP_RESPONSE_1XX = "1xx";
+  public static final String HTTP_RESPONSE_2XX = "2xx";
+  public static final String HTTP_RESPONSE_3XX = "3xx";
+  public static final String HTTP_RESPONSE_4XX = "4xx";
+  public static final String HTTP_RESPONSE_5XX = "5xx";
+  private final Histogram responseLatency;
+  private final Map<Integer, Meter> responseCodeMeters = new 
ConcurrentHashMap<>();
+  private final Map<String, Meter> requestMethodCounter = new 
ConcurrentHashMap<>();
+  private final List<Meter> responseCodeGroupMeters = new ArrayList<>(5);
+  private final Meter requestFailedRateMeter;
+  private final Meter requestRateMeter;
+  private final Meter responseRateMeter;
+  private final Map<String, Meter> namespacedResourceMethodMeters = new 
ConcurrentHashMap<>();
+
+  public KubernetesMetricsInterceptor() {
+    metricRegistry = new MetricRegistry();
+
+    responseLatency =
+        metricRegistry.histogram(
+            MetricRegistry.name(HTTP_RESPONSE_GROUP, "latency", 
"nanos").toLowerCase());
+    requestFailedRateMeter =
+        
metricRegistry.meter(MetricRegistry.name(HTTP_REQUEST_FAILED_GROUP).toLowerCase());
+    requestRateMeter = 
metricRegistry.meter(MetricRegistry.name(HTTP_REQUEST_GROUP).toLowerCase());
+    responseRateMeter =
+        
metricRegistry.meter(MetricRegistry.name(HTTP_RESPONSE_GROUP).toLowerCase());
+
+    if 
(KUBERNETES_CLIENT_METRICS_GROUP_BY_RESPONSE_CODE_GROUP_ENABLED.getValue()) {
+      responseCodeGroupMeters.add(
+          
metricRegistry.meter(MetricRegistry.name(HTTP_RESPONSE_1XX).toLowerCase()));
+      responseCodeGroupMeters.add(
+          
metricRegistry.meter(MetricRegistry.name(HTTP_RESPONSE_2XX).toLowerCase()));
+      responseCodeGroupMeters.add(
+          
metricRegistry.meter(MetricRegistry.name(HTTP_RESPONSE_3XX).toLowerCase()));
+      responseCodeGroupMeters.add(
+          
metricRegistry.meter(MetricRegistry.name(HTTP_RESPONSE_4XX).toLowerCase()));
+      responseCodeGroupMeters.add(
+          
metricRegistry.meter(MetricRegistry.name(HTTP_RESPONSE_5XX).toLowerCase()));
+    }
+  }
+
+  @NotNull
+  @Override
+  public Response intercept(@NotNull Chain chain) throws IOException {
+    Request request = chain.request();
+    updateRequestMetrics(request);
+    Response response = null;
+    final long startTime = System.nanoTime();
+    try {
+      response = chain.proceed(request);
+      return response;
+    } finally {
+      updateResponseMetrics(response, startTime);
+    }
+  }
+
+  @Override
+  public String sourceName() {
+    return "kubernetes.client";
+  }
+
+  @Override
+  public MetricRegistry metricRegistry() {
+    return this.metricRegistry;
+  }
+
+  private void updateRequestMetrics(Request request) {
+    this.requestRateMeter.mark();
+    getMeterByRequestMethod(request.method()).mark();
+    Optional<Pair<String, String>> resourceNamePairOptional =
+        parseNamespaceScopedResource(request.url().uri().getPath());
+    resourceNamePairOptional.ifPresent(
+        pair -> {
+          getMeterByRequestMethodAndResourceName(pair.getValue(), 
request.method()).mark();
+          getMeterByRequestMethodAndResourceName(
+                  pair.getKey() + "." + pair.getValue(), request.method())
+              .mark();
+        });
+  }
+
+  private void updateResponseMetrics(Response response, long startTimeNanos) {
+    final long latency = System.nanoTime() - startTimeNanos;
+    if (response != null) {
+      this.responseRateMeter.mark();
+      this.responseLatency.update(latency);
+      getMeterByResponseCode(response.code()).mark();
+      if 
(KUBERNETES_CLIENT_METRICS_GROUP_BY_RESPONSE_CODE_GROUP_ENABLED.getValue()) {
+        responseCodeGroupMeters.get(response.code() / 100 - 1).mark();
+      }
+    } else {
+      this.requestFailedRateMeter.mark();
+    }
+  }
+
+  private Meter getMeterByRequestMethod(String method) {
+    return requestMethodCounter.computeIfAbsent(
+        method,
+        key -> metricRegistry.meter(MetricRegistry.name(HTTP_REQUEST_GROUP, 
method).toLowerCase()));
+  }
+
+  private Meter getMeterByRequestMethodAndResourceName(String resourceName, 
String method) {
+    String metricsName = MetricRegistry.name(resourceName, method);
+    return namespacedResourceMethodMeters.computeIfAbsent(
+        metricsName, key -> metricRegistry.meter(metricsName.toLowerCase()));
+  }
+
+  private Meter getMeterByResponseCode(int code) {
+    return responseCodeMeters.computeIfAbsent(
+        code,
+        key ->
+            metricRegistry.meter(MetricRegistry.name(HTTP_RESPONSE_GROUP, 
String.valueOf(code))));
+  }
+
+  public Optional<Pair<String, String>> parseNamespaceScopedResource(String 
path) {
+    if (path.contains(NAMESPACES)) {
+      int index = path.indexOf(NAMESPACES) + NAMESPACES.length();
+      String namespaceAndResources = path.substring(index + 1);
+      String[] parts = namespaceAndResources.split("/");
+      return Optional.of(Pair.of(parts[0], parts[1]));
+    } else {
+      return Optional.empty();
+    }
+  }
+}
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/OperatorJosdkMetrics.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/OperatorJosdkMetrics.java
new file mode 100644
index 0000000..6eb8867
--- /dev/null
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/OperatorJosdkMetrics.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics.source;
+
+import static 
io.javaoperatorsdk.operator.api.reconciler.Constants.CONTROLLER_NAME;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.MetricRegistry;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.api.monitoring.Metrics;
+import io.javaoperatorsdk.operator.api.reconciler.Constants;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.processing.Controller;
+import io.javaoperatorsdk.operator.processing.GroupVersionKind;
+import io.javaoperatorsdk.operator.processing.event.Event;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import 
io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
+import 
io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.spark.k8s.operator.BaseResource;
+import org.apache.spark.k8s.operator.SparkApplication;
+import org.apache.spark.metrics.source.Source;
+import org.apache.spark.util.Clock;
+import org.apache.spark.util.SystemClock;
+
+@Slf4j
+public class OperatorJosdkMetrics implements Source, Metrics {
+  public static final String FINISHED = "finished";
+  public static final String CLEANUP = "cleanup";
+  public static final String FAILED = "failed";
+  public static final String RETRIES = "retries";
+  private final Map<String, Histogram> histograms = new ConcurrentHashMap<>();
+  private final Map<String, Counter> counters = new ConcurrentHashMap<>();
+  private final Map<String, Gauge<?>> gauges = new ConcurrentHashMap<>();
+  private static final String RECONCILIATION = "reconciliation";
+  private static final String RESOURCE = "resource";
+  private static final String EVENT = "event";
+  private static final String SUCCESS = "success";
+  private static final String FAILURE = "failure";
+  private static final String EXCEPTION = "exception";
+  private static final String PREFIX = "operator.sdk";
+  private static final String RECONCILIATIONS = "reconciliations";
+  private static final String RECONCILIATIONS_EXECUTIONS = RECONCILIATIONS + 
".executions";
+  private static final String RECONCILIATIONS_QUEUE_SIZE = RECONCILIATIONS + 
".queue.size";
+  private static final String SIZE = "size";
+
+  private final Clock clock;
+  private final MetricRegistry metricRegistry;
+
+  public OperatorJosdkMetrics() {
+    this.clock = new SystemClock();
+    this.metricRegistry = new MetricRegistry();
+  }
+
+  @Override
+  public String sourceName() {
+    return PREFIX;
+  }
+
+  @Override
+  public MetricRegistry metricRegistry() {
+    return metricRegistry;
+  }
+
+  @Override
+  public void controllerRegistered(Controller<? extends HasMetadata> 
controller) {
+    // no-op
+    log.debug("Controller has been registered");
+  }
+
+  @Override
+  public void receivedEvent(Event event, Map<String, Object> metadata) {
+    log.debug("received event {}, metadata {}", event, metadata);
+    if (event instanceof ResourceEvent) {
+      final ResourceAction action = ((ResourceEvent) event).getAction();
+      final Optional<Class<? extends BaseResource<?, ?, ?, ?, ?>>> resource =
+          getResourceClass(metadata);
+      final Optional<String> namespaceOptional = 
event.getRelatedCustomResourceID().getNamespace();
+      resource.ifPresent(
+          aClass -> getCounter(aClass, action.name().toLowerCase(), RESOURCE, 
EVENT).inc());
+      if (resource.isPresent() && namespaceOptional.isPresent()) {
+        getCounter(
+                resource.get(),
+                namespaceOptional.get(),
+                action.name().toLowerCase(),
+                RESOURCE,
+                EVENT)
+            .inc();
+      }
+    }
+  }
+
+  @Override
+  public <T> T timeControllerExecution(ControllerExecution<T> execution) 
throws Exception {
+    log.debug("Time controller execution");
+    final String name = execution.controllerName();
+    final ResourceID resourceID = execution.resourceID();
+    final Optional<String> namespaceOptional = resourceID.getNamespace();
+    final Map<String, Object> metadata = execution.metadata();
+    final Optional<Class<? extends BaseResource<?, ?, ?, ?, ?>>> resourceClass 
=
+        getResourceClass(metadata);
+    final String execName = execution.name();
+
+    long startTime = clock.getTimeMillis();
+    try {
+      T result = execution.execute();
+      final String successType = execution.successTypeName(result);
+      if (resourceClass.isPresent()) {
+        getHistogram(resourceClass.get(), name, execName, 
successType).update(toSeconds(startTime));
+        getCounter(resourceClass.get(), name, execName, SUCCESS, 
successType).inc();
+        if (namespaceOptional.isPresent()) {
+          getHistogram(resourceClass.get(), namespaceOptional.get(), name, 
execName, successType)
+              .update(toSeconds(startTime));
+          getCounter(
+                  resourceClass.get(),
+                  namespaceOptional.get(),
+                  name,
+                  execName,
+                  SUCCESS,
+                  successType)
+              .inc();
+        }
+      }
+      return result;
+    } catch (Exception e) {
+      log.error(
+          "Controller execution failed for resource {}, metadata {}", 
resourceID, metadata, e);
+      final String exception = e.getClass().getSimpleName();
+      if (resourceClass.isPresent()) {
+        getHistogram(resourceClass.get(), name, execName, 
FAILURE).update(toSeconds(startTime));
+        getCounter(resourceClass.get(), name, execName, FAILURE, EXCEPTION, 
exception).inc();
+        if (namespaceOptional.isPresent()) {
+          getHistogram(resourceClass.get(), namespaceOptional.get(), name, 
execName, FAILURE)
+              .update(toSeconds(startTime));
+          getCounter(
+                  resourceClass.get(),
+                  namespaceOptional.get(),
+                  name,
+                  execName,
+                  FAILURE,
+                  EXCEPTION,
+                  exception)
+              .inc();
+        }
+      }
+      throw e;
+    }
+  }
+
+  @Override
+  public void reconcileCustomResource(
+      HasMetadata resource, RetryInfo retryInfo, Map<String, Object> metadata) 
{
+    log.debug(
+        "Reconcile custom resource {}, with retryInfo {} metadata {}",
+        resource,
+        retryInfo,
+        metadata);
+    if (retryInfo != null) {
+      final String namespace = resource.getMetadata().getNamespace();
+      getCounter(resource.getClass(), RECONCILIATION, RETRIES).inc();
+      getCounter(resource.getClass(), namespace, RECONCILIATION, 
RETRIES).inc();
+    }
+    getCounter(
+            resource.getClass(), (String) metadata.get(CONTROLLER_NAME), 
RECONCILIATIONS_QUEUE_SIZE)
+        .inc();
+  }
+
+  @Override
+  public void failedReconciliation(
+      HasMetadata resource, Exception exception, Map<String, Object> metadata) 
{
+    log.error(
+        "Failed reconciliation for resource {} with metadata {}", resource, 
exception, exception);
+    getCounter(resource.getClass(), RECONCILIATION, FAILED).inc();
+    getCounter(resource.getClass(), resource.getMetadata().getNamespace(), 
RECONCILIATION, FAILED)
+        .inc();
+  }
+
+  @Override
+  public void finishedReconciliation(HasMetadata resource, Map<String, Object> 
metadata) {
+    log.debug("Finished reconciliation for resource {} with metadata {}", 
resource, metadata);
+    getCounter(resource.getClass(), RECONCILIATION, FINISHED).inc();
+    getCounter(
+        resource.getClass(), resource.getMetadata().getNamespace(), 
RECONCILIATION, FINISHED);
+  }
+
+  @Override
+  public void cleanupDoneFor(ResourceID resourceID, Map<String, Object> 
metadata) {
+    log.debug("Cleanup Done for resource {} with metadata {}", resourceID, 
metadata);
+    getCounter(resourceID.getClass(), RECONCILIATION, CLEANUP).inc();
+    resourceID
+        .getNamespace()
+        .ifPresent(ns -> getCounter(resourceID.getClass(), ns, RECONCILIATION, 
CLEANUP).inc());
+  }
+
+  @Override
+  public <T extends Map<?, ?>> T monitorSizeOf(T map, String name) {
+    log.debug("Monitor size for {}", name);
+    Gauge<Integer> gauge =
+        new Gauge<>() {
+          @Override
+          public Integer getValue() {
+            return map.size();
+          }
+        };
+    gauges.put(MetricRegistry.name(name, SIZE), gauge);
+    return map;
+  }
+
+  @Override
+  public void reconciliationExecutionStarted(HasMetadata resource, Map<String, 
Object> metadata) {
+    log.debug("Reconciliation execution started");
+    String namespace = resource.getMetadata().getNamespace();
+    getCounter(
+            resource.getClass(), (String) metadata.get(CONTROLLER_NAME), 
RECONCILIATIONS_EXECUTIONS)
+        .inc();
+    getCounter(
+            resource.getClass(),
+            namespace,
+            (String) metadata.get(CONTROLLER_NAME),
+            RECONCILIATIONS_EXECUTIONS)
+        .inc();
+  }
+
+  @Override
+  public void reconciliationExecutionFinished(HasMetadata resource, 
Map<String, Object> metadata) {
+    log.debug("Reconciliation execution finished");
+    String namespace = resource.getMetadata().getNamespace();
+    getCounter(
+            resource.getClass(), (String) metadata.get(CONTROLLER_NAME), 
RECONCILIATIONS_EXECUTIONS)
+        .dec();
+    getCounter(
+            resource.getClass(),
+            namespace,
+            (String) metadata.get(CONTROLLER_NAME),
+            RECONCILIATIONS_EXECUTIONS)
+        .dec();
+    getCounter(
+            resource.getClass(), (String) metadata.get(CONTROLLER_NAME), 
RECONCILIATIONS_QUEUE_SIZE)
+        .dec();
+  }
+
+  private long toSeconds(long startTimeInMilliseconds) {
+    return TimeUnit.MILLISECONDS.toSeconds(clock.getTimeMillis() - 
startTimeInMilliseconds);
+  }
+
+  private Histogram getHistogram(Class<?> kclass, String... names) {
+    String name = MetricRegistry.name(kclass.getSimpleName(), 
names).toLowerCase();
+    Histogram histogram;
+    if (!histograms.containsKey(name)) {
+      histogram = metricRegistry.histogram(name);
+      histograms.put(name, histogram);
+    } else {
+      histogram = histograms.get(name);
+    }
+    return histogram;
+  }
+
+  private Counter getCounter(Class<?> klass, String... names) {
+    String name = MetricRegistry.name(klass.getSimpleName(), 
names).toLowerCase();
+    Counter counter;
+    if (!counters.containsKey(name)) {
+      counter = metricRegistry.counter(name);
+      counters.put(name, counter);
+    } else {
+      counter = counters.get(name);
+    }
+    return counter;
+  }
+
+  private Optional<Class<? extends BaseResource<?, ?, ?, ?, ?>>> 
getResourceClass(
+      Map<String, Object> metadata) {
+    GroupVersionKind resourceGvk = (GroupVersionKind) 
metadata.get(Constants.RESOURCE_GVK_KEY);
+
+    if (resourceGvk == null) {
+      return Optional.empty();
+    }
+
+    Class<? extends BaseResource<?, ?, ?, ?, ?>> resourceClass;
+
+    if (resourceGvk.getKind().equals(SparkApplication.class.getSimpleName())) {
+      resourceClass = SparkApplication.class;
+    } else {
+      return Optional.empty();
+    }
+    return Optional.of(resourceClass);
+  }
+}
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/OperatorJvmSource.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/OperatorJvmSource.java
new file mode 100644
index 0000000..8cf7df0
--- /dev/null
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/OperatorJvmSource.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics.source;
+
+import com.codahale.metrics.MetricRegistry;
+
+import org.apache.spark.k8s.operator.metrics.JVMMetricSet;
+import org.apache.spark.metrics.source.Source;
+
+public class OperatorJvmSource implements Source {
+  @Override
+  public String sourceName() {
+    return "jvm";
+  }
+
+  @Override
+  public MetricRegistry metricRegistry() {
+    MetricRegistry metricRegistry = new MetricRegistry();
+    metricRegistry.registerAll(new JVMMetricSet());
+    return metricRegistry;
+  }
+}
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ClassLoadingUtils.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ClassLoadingUtils.java
new file mode 100644
index 0000000..1d89a77
--- /dev/null
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ClassLoadingUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.utils;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.spark.k8s.operator.listeners.BaseStatusListener;
+
+@Slf4j
+public class ClassLoadingUtils {
+  /**
+   * load status listener classes for given type of BaseStatusListener
+   *
+   * @param clazz Class name of status listener. e.g. SparkAppStatusListener
+   * @param implementationClassNames Comma-separated implementation class 
names of given type of
+   *     StatusListener
+   * @return list of listeners loaded
+   * @param <T> Class of status listener. e.g. SparkAppStatusListener
+   */
+  public static <T extends BaseStatusListener<?, ?>> List<T> getStatusListener(
+      Class<T> clazz, String implementationClassNames) {
+    List<T> listeners = new ArrayList<>();
+    if (StringUtils.isNotBlank(implementationClassNames)) {
+      try {
+        Set<String> listenerNames = 
Utils.sanitizeCommaSeparatedStrAsSet(implementationClassNames);
+        for (String name : listenerNames) {
+          Class listenerClass = Class.forName(name);
+          if (clazz.isAssignableFrom(listenerClass)) {
+            listeners.add((T) listenerClass.getConstructor().newInstance());
+          }
+        }
+      } catch (ClassNotFoundException
+          | NoSuchMethodException
+          | InstantiationException
+          | IllegalAccessException
+          | InvocationTargetException e) {
+        if (log.isErrorEnabled()) {
+          log.error(
+              "Failed to initialize listeners for operator with {}", 
implementationClassNames, e);
+        }
+      }
+    }
+    return listeners;
+  }
+}
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/LoggingUtils.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/LoggingUtils.java
new file mode 100644
index 0000000..d205019
--- /dev/null
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/LoggingUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.utils;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.slf4j.MDC;
+
+import org.apache.spark.k8s.operator.SparkApplication;
+import org.apache.spark.k8s.operator.status.ApplicationAttemptSummary;
+
+public class LoggingUtils {
+  public static final class TrackedMDC {
+    public static final String AppAttemptIdKey = "resource.app.attemptId";
+    private final ReentrantLock lock = new ReentrantLock();
+    private Set<String> keys = new HashSet<>();
+
+    public void set(final SparkApplication application) {
+      if (application != null && application.getStatus() != null) {
+        try {
+          lock.lock();
+          ApplicationAttemptSummary summary = 
application.getStatus().getCurrentAttemptSummary();
+          if (summary != null && summary.getAttemptInfo() != null) {
+            MDC.put(AppAttemptIdKey, 
String.valueOf(summary.getAttemptInfo().getId()));
+            keys.add(AppAttemptIdKey);
+          }
+        } finally {
+          lock.unlock();
+        }
+      }
+    }
+
+    public void reset() {
+      try {
+        lock.lock();
+        for (String mdcKey : keys) {
+          MDC.remove(mdcKey);
+        }
+        keys.clear();
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
+}
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/PodPhase.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/PodPhase.java
new file mode 100644
index 0000000..74c7d66
--- /dev/null
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/PodPhase.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.utils;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Defines common (driver) pod phases that may need to be handled explicitly 
by the operator. For
+ * pod phase definition: {@see
+ * https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase}
+ */
+@Slf4j
+public enum PodPhase {
+  PENDING("pending"),
+  RUNNING("running"),
+  FAILED("failed"),
+  SUCCEEDED("succeeded"),
+  UNKNOWN("unknown");
+
+  private final String description;
+
+  PodPhase(String description) {
+    this.description = description;
+  }
+
+  public static PodPhase getPhase(final Pod pod) {
+    if (pod == null || pod.getStatus() == null) {
+      return UNKNOWN;
+    }
+    for (PodPhase podPhase : values()) {
+      if (podPhase.description.equalsIgnoreCase(pod.getStatus().getPhase())) {
+        return podPhase;
+      }
+    }
+    if (log.isErrorEnabled()) {
+      log.error("Unable to determine pod phase from status: {}", 
pod.getStatus());
+    }
+    return UNKNOWN;
+  }
+}
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/PodUtils.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/PodUtils.java
new file mode 100644
index 0000000..9df3f53
--- /dev/null
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/PodUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.utils;
+
+import static 
org.apache.spark.k8s.operator.utils.ModelUtils.findDriverMainContainerStatus;
+
+import java.util.List;
+
+import io.fabric8.kubernetes.api.model.ContainerStatus;
+import io.fabric8.kubernetes.api.model.Pod;
+
+import org.apache.spark.k8s.operator.spec.ApplicationSpec;
+
+public class PodUtils {
+
+  public static final String POD_READY_CONDITION_TYPE = "ready";
+
+  /** Determine whether given pod is up running and ready */
+  public static boolean isPodReady(final Pod pod) {
+    if (!PodPhase.RUNNING.equals(PodPhase.getPhase(pod))) {
+      return false;
+    }
+    if (pod == null
+        || pod.getStatus() == null
+        || pod.getStatus().getConditions() == null
+        || pod.getStatus().getConditions().isEmpty()) {
+      return false;
+    }
+    return pod.getStatus().getConditions().parallelStream()
+        .anyMatch(
+            condition ->
+                POD_READY_CONDITION_TYPE.equalsIgnoreCase(condition.getType())
+                    && Boolean.parseBoolean(condition.getStatus()));
+  }
+
+  /**
+   * Determine whether the driver pod is started. Driver is considered as 
'started' if any of Spark
+   * container is started and ready
+   *
+   * @param driver the driver pod
+   * @param spec expected spec for the SparkApp
+   */
+  public static boolean isDriverPodStarted(final Pod driver, final 
ApplicationSpec spec) {
+    // Consider pod as 'started' if any of Spark container is started and ready
+    if (driver == null
+        || driver.getStatus() == null
+        || driver.getStatus().getContainerStatuses() == null
+        || driver.getStatus().getContainerStatuses().isEmpty()) {
+      return false;
+    }
+
+    List<ContainerStatus> containerStatusList = 
driver.getStatus().getContainerStatuses();
+
+    // If there's only one container in given pod, evaluate it
+    // Otherwise, use the provided name as filter.
+    if (containerStatusList.size() == 1) {
+      return containerStatusList.get(0).getReady();
+    }
+
+    return findDriverMainContainerStatus(spec, containerStatusList).stream()
+        .anyMatch(ContainerStatus::getReady);
+  }
+
+  /** Returns true if the given container has terminated */
+  public static boolean isContainerTerminated(final ContainerStatus 
containerStatus) {
+    return containerStatus != null
+        && containerStatus.getState() != null
+        && containerStatus.getState().getTerminated() != null;
+  }
+
+  /** Returns true if the given container has ever restarted */
+  public static boolean isContainerRestarted(final ContainerStatus 
containerStatus) {
+    return containerStatus != null && containerStatus.getRestartCount() > 0;
+  }
+
+  /** Returns true if the given container has exited with non-zero status */
+  public static boolean isContainerFailed(final ContainerStatus 
containerStatus) {
+    return isContainerTerminated(containerStatus)
+        && containerStatus.getState().getTerminated().getExitCode() > 0;
+  }
+}
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ProbeUtil.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ProbeUtil.java
new file mode 100644
index 0000000..4ac1fd7
--- /dev/null
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ProbeUtil.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.utils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Optional;
+
+import com.sun.net.httpserver.HttpExchange;
+import io.javaoperatorsdk.operator.Operator;
+import io.javaoperatorsdk.operator.RuntimeInfo;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ProbeUtil {
+  public static void sendMessage(HttpExchange httpExchange, int code, String 
message)
+      throws IOException {
+    try (OutputStream outputStream = httpExchange.getResponseBody()) {
+      byte[] bytes = message.getBytes(StandardCharsets.UTF_8);
+      httpExchange.sendResponseHeaders(code, bytes.length);
+      outputStream.write(bytes);
+      outputStream.flush();
+    }
+  }
+
+  public static Optional<Boolean> areOperatorsStarted(List<Operator> 
operators) {
+    return operators.stream()
+        .map(
+            operator -> {
+              RuntimeInfo runtimeInfo = operator.getRuntimeInfo();
+              if (runtimeInfo != null) {
+                if (!operator.getRuntimeInfo().isStarted()) {
+                  log.error("Operator is not running");
+                  return false;
+                }
+                return true;
+              }
+              return false;
+            })
+        .reduce((a, b) -> a && b);
+  }
+}
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusUtils.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusUtils.java
new file mode 100644
index 0000000..c3491d1
--- /dev/null
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.utils;
+
+import org.apache.spark.k8s.operator.Constants;
+import org.apache.spark.k8s.operator.SparkApplication;
+import org.apache.spark.k8s.operator.status.ApplicationState;
+import org.apache.spark.k8s.operator.status.ApplicationStateSummary;
+
+/** Handy utils for create & manage Application Status */
+public class SparkAppStatusUtils {
+
+  public static boolean isValidApplicationStatus(SparkApplication app) {
+    // null check
+    return app.getStatus() != null
+        && app.getStatus().getCurrentState() != null
+        && app.getStatus().getCurrentState().getCurrentStateSummary() != null;
+  }
+
+  public static ApplicationState driverUnexpectedRemoved() {
+    return new ApplicationState(
+        ApplicationStateSummary.Failed, 
Constants.DRIVER_UNEXPECTED_REMOVED_MESSAGE);
+  }
+
+  public static ApplicationState driverLaunchTimedOut() {
+    return new ApplicationState(
+        ApplicationStateSummary.DriverStartTimedOut, 
Constants.DRIVER_LAUNCH_TIMEOUT_MESSAGE);
+  }
+
+  public static ApplicationState driverReadyTimedOut() {
+    return new ApplicationState(
+        ApplicationStateSummary.DriverReadyTimedOut, 
Constants.DRIVER_LAUNCH_TIMEOUT_MESSAGE);
+  }
+
+  public static ApplicationState executorLaunchTimedOut() {
+    return new ApplicationState(
+        ApplicationStateSummary.ExecutorsStartTimedOut, 
Constants.EXECUTOR_LAUNCH_TIMEOUT_MESSAGE);
+  }
+
+  public static ApplicationState appCancelled() {
+    return new ApplicationState(
+        ApplicationStateSummary.ResourceReleased, 
Constants.APP_CANCELLED_MESSAGE);
+  }
+
+  public static boolean hasReachedState(
+      SparkApplication application, ApplicationState stateToCheck) {
+    if (!isValidApplicationStatus(application)) {
+      return false;
+    }
+    return 
application.getStatus().getStateTransitionHistory().keySet().parallelStream()
+        .anyMatch(
+            stateId ->
+                stateToCheck.equals(
+                    
application.getStatus().getStateTransitionHistory().get(stateId)));
+  }
+}
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkExceptionUtils.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkExceptionUtils.java
new file mode 100644
index 0000000..a7be937
--- /dev/null
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkExceptionUtils.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.utils;
+
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
+public class SparkExceptionUtils {
+  public static boolean 
isConflictForExistingResource(KubernetesClientException e) {
+    return e != null
+        && e.getCode() == 409
+        && e.getStatus() != null
+        && StringUtils.isNotEmpty(e.getStatus().toString())
+        && e.getStatus().toString().toLowerCase().contains("alreadyexists");
+  }
+
+  public static String buildGeneralErrorMessage(Exception e) {
+    return ExceptionUtils.getStackTrace(e);
+  }
+}
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
new file mode 100644
index 0000000..aae6df9
--- /dev/null
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.utils;
+
+import static org.apache.spark.k8s.operator.Constants.LABEL_RESOURCE_NAME;
+import static 
org.apache.spark.k8s.operator.Constants.LABEL_SPARK_OPERATOR_NAME;
+import static 
org.apache.spark.k8s.operator.Constants.LABEL_SPARK_ROLE_DRIVER_VALUE;
+import static 
org.apache.spark.k8s.operator.Constants.LABEL_SPARK_ROLE_EXECUTOR_VALUE;
+import static 
org.apache.spark.k8s.operator.config.SparkOperatorConf.OPERATOR_APP_NAME;
+import static 
org.apache.spark.k8s.operator.config.SparkOperatorConf.OPERATOR_WATCHED_NAMESPACES;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.spark.k8s.operator.Constants;
+import org.apache.spark.k8s.operator.SparkApplication;
+
+public class Utils {
+  public static Set<String> sanitizeCommaSeparatedStrAsSet(String str) {
+    if (StringUtils.isBlank(str)) {
+      return Collections.emptySet();
+    }
+    if ("*".equals(str)) {
+      return Collections.emptySet();
+    }
+    return Arrays.stream(str.split(","))
+        .map(String::trim)
+        .filter(StringUtils::isNotBlank)
+        .collect(Collectors.toSet());
+  }
+
+  public static String labelsAsStr(Map<String, String> labels) {
+    return labels.entrySet().stream()
+        .map(e -> String.join("=", e.getKey(), e.getValue()))
+        .collect(Collectors.joining(","));
+  }
+
+  public static Map<String, String> commonOperatorResourceLabels() {
+    Map<String, String> labels = new HashMap<>();
+    labels.put(LABEL_RESOURCE_NAME, OPERATOR_APP_NAME.getValue());
+    return labels;
+  }
+
+  public static Map<String, String> defaultOperatorConfigLabels() {
+    Map<String, String> labels = new HashMap<>(commonOperatorResourceLabels());
+    labels.put("app.kubernetes.io/component", 
"operator-dynamic-config-overrides");
+    return labels;
+  }
+
+  public static Map<String, String> commonManagedResourceLabels() {
+    Map<String, String> labels = new HashMap<>();
+    labels.put(LABEL_SPARK_OPERATOR_NAME, OPERATOR_APP_NAME.getValue());
+    return labels;
+  }
+
+  public static Map<String, String> sparkAppResourceLabels(final 
SparkApplication app) {
+    return sparkAppResourceLabels(app.getMetadata().getName());
+  }
+
+  public static Map<String, String> sparkAppResourceLabels(final String 
appName) {
+    Map<String, String> labels = commonManagedResourceLabels();
+    labels.put(Constants.LABEL_SPARK_APPLICATION_NAME, appName);
+    return labels;
+  }
+
+  public static Map<String, String> driverLabels(final SparkApplication 
sparkApplication) {
+    Map<String, String> labels = sparkAppResourceLabels(sparkApplication);
+    labels.put(Constants.LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_DRIVER_VALUE);
+    return labels;
+  }
+
+  public static Map<String, String> executorLabels(final SparkApplication 
sparkApplication) {
+    Map<String, String> labels = sparkAppResourceLabels(sparkApplication);
+    labels.put(Constants.LABEL_SPARK_ROLE_NAME, 
LABEL_SPARK_ROLE_EXECUTOR_VALUE);
+    return labels;
+  }
+
+  public static Set<String> getWatchedNamespaces() {
+    return 
Utils.sanitizeCommaSeparatedStrAsSet(OPERATOR_WATCHED_NAMESPACES.getValue());
+  }
+
+  /**
+   * Labels to be applied to all created resources, as a comma-separated string
+   *
+   * @return labels string
+   */
+  public static String commonResourceLabelsStr() {
+    return labelsAsStr(commonManagedResourceLabels());
+  }
+}
diff --git a/spark-operator/src/main/resources/EcsLayout.json 
b/spark-operator/src/main/resources/EcsLayout.json
new file mode 100644
index 0000000..4660280
--- /dev/null
+++ b/spark-operator/src/main/resources/EcsLayout.json
@@ -0,0 +1,49 @@
+{
+  "@timestamp": {
+    "$resolver": "timestamp",
+    "pattern": {
+      "format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'",
+      "timeZone": "UTC"
+    }
+  },
+  "ecs.version": "1.6.0",
+  "log.level": {
+    "$resolver": "level",
+    "field": "name"
+  },
+  "message": {
+    "$resolver": "message",
+    "stringified": true
+  },
+  "process.thread.name": {
+    "$resolver": "thread",
+    "field": "name"
+  },
+  "log.logger": {
+    "$resolver": "logger",
+    "field": "name"
+  },
+  "labels": {
+    "$resolver": "mdc",
+    "flatten": true,
+    "stringified": true
+  },
+  "tags": {
+    "$resolver": "ndc"
+  },
+  "error.type": {
+    "$resolver": "exception",
+    "field": "className"
+  },
+  "error.message": {
+    "$resolver": "exception",
+    "field": "message"
+  },
+  "error.stack_trace": {
+    "$resolver": "exception",
+    "field": "stackTrace",
+    "stackTrace": {
+      "stringified": true
+    }
+  }
+}
diff --git 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/MetricsSystemFactoryTest.java
 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/MetricsSystemFactoryTest.java
new file mode 100644
index 0000000..a9a341b
--- /dev/null
+++ 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/MetricsSystemFactoryTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics;
+
+import static 
org.apache.spark.k8s.operator.metrics.MetricsSystemFactory.parseSinkProperties;
+import static org.junit.Assert.assertThrows;
+
+import java.util.Properties;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class MetricsSystemFactoryTest {
+
+  @Test
+  void testMetricsSystemFailFastWithNoClassFullName() {
+    Properties properties = new Properties();
+    properties.put("sink.mocksink.period", "10");
+    properties.put("sink.console.class", 
"org.apache.spark.metrics.sink.ConsoleSink");
+    RuntimeException e =
+        assertThrows(RuntimeException.class, () -> 
parseSinkProperties(properties));
+    Assertions.assertEquals(
+        "mocksink provides properties, but does not provide full class name", 
e.getMessage());
+  }
+
+  @Test
+  void testMetricsSystemFailFastWithNotFoundClassName() {
+    Properties properties = new Properties();
+    properties.put("sink.console.class", 
"org.apache.spark.metrics.sink.FooSink");
+    RuntimeException e =
+        assertThrows(RuntimeException.class, () -> 
parseSinkProperties(properties));
+    Assertions.assertEquals(
+        "Fail to find class org.apache.spark.metrics.sink.FooSink", 
e.getMessage());
+  }
+}
diff --git 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/MetricsSystemTest.java
 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/MetricsSystemTest.java
new file mode 100644
index 0000000..5bfccc8
--- /dev/null
+++ 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/MetricsSystemTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics;
+
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import org.apache.spark.k8s.operator.metrics.sink.MockSink;
+import org.apache.spark.metrics.sink.Sink;
+import org.apache.spark.metrics.source.Source;
+
+class MetricsSystemTest {
+  @Test
+  void testMetricsSystemWithResourcesAdd() {
+    MetricsSystem metricsSystem = new MetricsSystem();
+    Set<Source> sourcesList = metricsSystem.getSources();
+    Set<Sink> sinks = metricsSystem.getSinks();
+    metricsSystem.start();
+    Assertions.assertEquals(1, sourcesList.size());
+    // By default, only prometheus sink is enabled
+    Assertions.assertEquals(1, sinks.size());
+    Assertions.assertFalse(metricsSystem.getRegistry().getMetrics().isEmpty());
+    metricsSystem.stop();
+    Assertions.assertTrue(metricsSystem.getRegistry().getMetrics().isEmpty());
+  }
+
+  @Test
+  void testMetricsSystemWithCustomizedSink() {
+    Properties properties = new Properties();
+    properties.put("sink.mocksink.class", 
"org.apache.spark.k8s.operator.metrics.sink.MockSink");
+    properties.put("sink.mocksink.period", "10");
+    MetricsSystem metricsSystem = new MetricsSystem(properties);
+    metricsSystem.start();
+    Assertions.assertEquals(2, metricsSystem.getSinks().size());
+    Optional<Sink> mockSinkOptional =
+        metricsSystem.getSinks().stream().filter(sink -> sink instanceof 
MockSink).findFirst();
+    Assertions.assertTrue(mockSinkOptional.isPresent());
+    Sink mockSink = mockSinkOptional.get();
+    metricsSystem.stop();
+    MockSink sink = (MockSink) mockSink;
+    Assertions.assertEquals(sink.getPollPeriod(), 10);
+    Assertions.assertEquals(sink.getTimeUnit(), TimeUnit.SECONDS);
+  }
+
+  @Test
+  void testMetricsSystemWithTwoSinkConfigurations() {
+    Properties properties = new Properties();
+    properties.put("sink.mocksink.class", 
"org.apache.spark.k8s.operator.metrics.sink.MockSink");
+    properties.put("sink.mocksink.period", "10");
+    properties.put("sink.console.class", 
"org.apache.spark.metrics.sink.ConsoleSink");
+    MetricsSystem metricsSystem = new MetricsSystem(properties);
+    metricsSystem.start();
+    Assertions.assertEquals(3, metricsSystem.getSinks().size());
+  }
+}
diff --git 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/sink/MockSink.java
 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/sink/MockSink.java
new file mode 100644
index 0000000..2026d9c
--- /dev/null
+++ 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/sink/MockSink.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics.sink;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.MetricRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.metrics.sink.Sink;
+
+@SuppressWarnings("PMD")
+public class MockSink implements Sink {
+  private static final Logger logger = LoggerFactory.getLogger(MockSink.class);
+  private Properties properties;
+  private MetricRegistry metricRegistry;
+  public static final String DEFAULT_UNIT = "SECONDS";
+  public static final int DEFAULT_PERIOD = 20;
+  public static final String KEY_PERIOD = "period";
+  public static final String KEY_UNIT = "unit";
+
+  public int getPollPeriod() {
+    return Integer.parseInt((String) properties.getOrDefault(KEY_PERIOD, 
DEFAULT_PERIOD));
+  }
+
+  public TimeUnit getTimeUnit() {
+    return TimeUnit.valueOf((String) properties.getOrDefault(KEY_UNIT, 
DEFAULT_UNIT));
+  }
+
+  public MockSink(Properties properties, MetricRegistry metricRegistry) {
+    logger.info("Current properties: {}", properties);
+    this.properties = properties;
+    this.metricRegistry = metricRegistry;
+  }
+
+  @Override
+  public void start() {
+    logger.info("Mock sink started");
+  }
+
+  @Override
+  public void stop() {
+    logger.info("Mock sink stopped");
+  }
+
+  @Override
+  public void report() {
+    logger.info("Mock sink reported");
+  }
+}
diff --git 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/source/KubernetesMetricsInterceptorTest.java
 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/source/KubernetesMetricsInterceptorTest.java
new file mode 100644
index 0000000..bd34e33
--- /dev/null
+++ 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/source/KubernetesMetricsInterceptorTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics.source;
+
+import static org.junit.Assert.assertThrows;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+import okhttp3.Interceptor;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+import org.apache.spark.k8s.operator.SparkApplication;
+import org.apache.spark.k8s.operator.client.KubernetesClientFactory;
+import org.apache.spark.k8s.operator.spec.ApplicationSpec;
+
+@EnableKubernetesMockClient(crud = true)
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+@SuppressWarnings("PMD")
+@SuppressFBWarnings(
+    value = {"UWF_UNWRITTEN_FIELD", "NP_UNWRITTEN_FIELD", "UUF_UNUSED_FIELD"},
+    justification = "Unwritten fields are covered by Kubernetes mock client")
+class KubernetesMetricsInterceptorTest {
+
+  private KubernetesMockServer mockServer;
+  private KubernetesClient kubernetesClient;
+
+  @AfterEach
+  void cleanUp() {
+    mockServer.reset();
+  }
+
+  @Test
+  @Order(1)
+  void testMetricsEnabled() {
+    KubernetesMetricsInterceptor metricsInterceptor = new 
KubernetesMetricsInterceptor();
+    List<Interceptor> interceptors = 
Collections.singletonList(metricsInterceptor);
+    KubernetesClient client =
+        KubernetesClientFactory.buildKubernetesClient(
+            interceptors, kubernetesClient.getConfiguration());
+    SparkApplication sparkApplication = createSparkApplication();
+    ConfigMap configMap = createConfigMap();
+
+    Map<String, Metric> metrics = new 
HashMap<>(metricsInterceptor.metricRegistry().getMetrics());
+    Assertions.assertEquals(9, metrics.size());
+    client.resource(sparkApplication).create();
+    client.resource(configMap).get();
+    Map<String, Metric> metrics2 = new 
HashMap<>(metricsInterceptor.metricRegistry().getMetrics());
+    Assertions.assertEquals(17, metrics2.size());
+    List<String> expectedMetricsName =
+        Arrays.asList(
+            "http.response.201",
+            "http.request.post",
+            "sparkapplications.post",
+            "spark-test.sparkapplications.post",
+            "spark-test.sparkapplications.post",
+            "configmaps.get",
+            "spark-system.configmaps.get",
+            "2xx",
+            "4xx");
+    expectedMetricsName.stream()
+        .forEach(
+            name -> {
+              Meter metric = (Meter) metrics2.get(name);
+              Assertions.assertEquals(metric.getCount(), 1);
+            });
+    client.resource(sparkApplication).delete();
+  }
+
+  @Test
+  @Order(2)
+  void testWhenKubernetesServerNotWorking() {
+    KubernetesMetricsInterceptor metricsInterceptor = new 
KubernetesMetricsInterceptor();
+    List<Interceptor> interceptors = 
Collections.singletonList(metricsInterceptor);
+    KubernetesClient client =
+        KubernetesClientFactory.buildKubernetesClient(
+            interceptors, kubernetesClient.getConfiguration());
+    int retry = client.getConfiguration().getRequestRetryBackoffLimit();
+    mockServer.shutdown();
+    SparkApplication sparkApplication = createSparkApplication();
+    assertThrows(
+        Exception.class,
+        () -> {
+          client.resource(sparkApplication).create();
+        });
+
+    Map<String, Metric> map = metricsInterceptor.metricRegistry().getMetrics();
+    Assertions.assertEquals(12, map.size());
+    Meter metric = (Meter) map.get("failed");
+    Assertions.assertEquals(metric.getCount(), retry + 1);
+  }
+
+  private static SparkApplication createSparkApplication() {
+    ObjectMeta meta = new ObjectMeta();
+    meta.setName("sample-spark-application");
+    meta.setNamespace("spark-test");
+    SparkApplication sparkApplication = new SparkApplication();
+    sparkApplication.setMetadata(meta);
+    ApplicationSpec applicationSpec = new ApplicationSpec();
+    applicationSpec.setMainClass("org.apache.spark.examples.SparkPi");
+    
applicationSpec.setJars("local:///opt/spark/examples/jars/spark-examples.jar");
+    applicationSpec.setSparkConf(
+        Map.of(
+            "spark.executor.instances", "5",
+            "spark.kubernetes.container.image", "spark",
+            "spark.kubernetes.namespace", "spark-test",
+            "spark.kubernetes.authenticate.driver.serviceAccountName", 
"spark"));
+    sparkApplication.setSpec(applicationSpec);
+    return sparkApplication;
+  }
+
+  private static ConfigMap createConfigMap() {
+    ObjectMeta meta = new ObjectMeta();
+    meta.setName("spark-job-operator-configuration");
+    meta.setNamespace("spark-system");
+    ConfigMap configMap = new ConfigMap();
+    configMap.setMetadata(meta);
+    return configMap;
+  }
+}
diff --git 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/source/OperatorJosdkMetricsTest.java
 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/source/OperatorJosdkMetricsTest.java
new file mode 100644
index 0000000..5fab771
--- /dev/null
+++ 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/source/OperatorJosdkMetricsTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics.source;
+
+import java.util.Map;
+
+import com.codahale.metrics.Metric;
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import io.javaoperatorsdk.operator.api.monitoring.Metrics;
+import io.javaoperatorsdk.operator.api.reconciler.Constants;
+import io.javaoperatorsdk.operator.processing.GroupVersionKind;
+import io.javaoperatorsdk.operator.processing.event.Event;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import 
io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
+import 
io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.spark.k8s.operator.SparkApplication;
+
+class OperatorJosdkMetricsTest {
+  private static final String DEFAULT_NAMESPACE = "default";
+  private static final String TEST_RESOURCE_NAME = "test1";
+  private static final ResourceID resourceId = new ResourceID("spark-pi", 
"testns");
+
+  private static final Map<String, Object> metadata =
+      Map.of(
+          Constants.RESOURCE_GVK_KEY,
+          GroupVersionKind.gvkFor(SparkApplication.class),
+          Constants.CONTROLLER_NAME,
+          "test-controller-name");
+  private static final String controllerName = "test-controller";
+
+  private OperatorJosdkMetrics operatorMetrics;
+
+  @BeforeEach
+  public void setup() {
+    operatorMetrics = new OperatorJosdkMetrics();
+  }
+
+  @Test
+  void testTimeControllerExecution() throws Exception {
+    TestingExecutionBase<SparkApplication> successExecution = new 
TestingExecutionBase<>();
+    operatorMetrics.timeControllerExecution(successExecution);
+    Map<String, Metric> metrics = 
operatorMetrics.metricRegistry().getMetrics();
+    Assertions.assertEquals(4, metrics.size());
+    
Assertions.assertTrue(metrics.containsKey("sparkapplication.test-controller.reconcile.both"));
+    Assertions.assertTrue(
+        
metrics.containsKey("sparkapplication.testns.test-controller.reconcile.both"));
+    Assertions.assertTrue(
+        
metrics.containsKey("sparkapplication.test-controller.reconcile.success.both"));
+    Assertions.assertTrue(
+        
metrics.containsKey("sparkapplication.testns.test-controller.reconcile.success.both"));
+
+    FooTestingExecutionBase<SparkApplication> failedExecution = new 
FooTestingExecutionBase<>();
+    try {
+      operatorMetrics.timeControllerExecution(failedExecution);
+    } catch (Exception e) {
+      Assertions.assertEquals(e.getMessage(), "Foo exception");
+      Assertions.assertEquals(8, metrics.size());
+      Assertions.assertTrue(
+          
metrics.containsKey("sparkapplication.test-controller.reconcile.failure"));
+      Assertions.assertTrue(
+          metrics.containsKey(
+              "sparkapplication.test-controller.reconcile.failure.exception"
+                  + ".nosuchfieldexception"));
+      Assertions.assertTrue(
+          
metrics.containsKey("sparkapplication.testns.test-controller.reconcile.failure"));
+      Assertions.assertTrue(
+          metrics.containsKey(
+              "sparkapplication.testns.test-controller.reconcile.failure."
+                  + "exception.nosuchfieldexception"));
+    }
+  }
+
+  @Test
+  void testReconciliationFinished() {
+    operatorMetrics.finishedReconciliation(buildNamespacedResource(), 
metadata);
+    Map<String, Metric> metrics = 
operatorMetrics.metricRegistry().getMetrics();
+    Assertions.assertEquals(2, metrics.size());
+    
Assertions.assertTrue(metrics.containsKey("configmap.default.reconciliation.finished"));
+    
Assertions.assertTrue(metrics.containsKey("configmap.reconciliation.finished"));
+  }
+
+  @Test
+  void testReconciliationExecutionStartedAndFinished() {
+    operatorMetrics.reconciliationExecutionStarted(buildNamespacedResource(), 
metadata);
+    Map<String, Metric> metrics = 
operatorMetrics.metricRegistry().getMetrics();
+    Assertions.assertEquals(2, metrics.size());
+    Assertions.assertTrue(
+        
metrics.containsKey("configmap.test-controller-name.reconciliations.executions"));
+    Assertions.assertTrue(
+        
metrics.containsKey("configmap.default.test-controller-name.reconciliations.executions"));
+    operatorMetrics.reconciliationExecutionFinished(buildNamespacedResource(), 
metadata);
+    Assertions.assertEquals(3, metrics.size());
+    Assertions.assertTrue(
+        
metrics.containsKey("configmap.test-controller-name.reconciliations.queue.size"));
+  }
+
+  @Test
+  void testReceivedEvent() {
+    Event event = new ResourceEvent(ResourceAction.ADDED, resourceId, 
buildNamespacedResource());
+    operatorMetrics.receivedEvent(event, metadata);
+    Map<String, Metric> metrics = 
operatorMetrics.metricRegistry().getMetrics();
+    Assertions.assertEquals(2, metrics.size());
+    
Assertions.assertTrue(metrics.containsKey("sparkapplication.added.resource.event"));
+    
Assertions.assertTrue(metrics.containsKey("sparkapplication.testns.added.resource.event"));
+  }
+
+  private static class TestingExecutionBase<T> implements 
Metrics.ControllerExecution<T> {
+    @Override
+    public String controllerName() {
+      return controllerName;
+    }
+
+    @Override
+    public String successTypeName(Object o) {
+      return "both";
+    }
+
+    @Override
+    public ResourceID resourceID() {
+      return resourceId;
+    }
+
+    @Override
+    public Map<String, Object> metadata() {
+      return metadata;
+    }
+
+    @Override
+    public String name() {
+      return "reconcile";
+    }
+
+    @Override
+    public T execute() throws Exception {
+      Thread.sleep(1000);
+      return null;
+    }
+  }
+
+  private static class FooTestingExecutionBase<T> implements 
Metrics.ControllerExecution<T> {
+    @Override
+    public String controllerName() {
+      return controllerName;
+    }
+
+    @Override
+    public String successTypeName(Object o) {
+      return "resource";
+    }
+
+    @Override
+    public ResourceID resourceID() {
+      return resourceId;
+    }
+
+    @Override
+    public Map<String, Object> metadata() {
+      return metadata;
+    }
+
+    @Override
+    public String name() {
+      return "reconcile";
+    }
+
+    @Override
+    public T execute() throws Exception {
+      throw new NoSuchFieldException("Foo exception");
+    }
+  }
+
+  private HasMetadata buildNamespacedResource() {
+    ConfigMap cm = new ConfigMap();
+    cm.setMetadata(
+        new ObjectMetaBuilder()
+            .withName(TEST_RESOURCE_NAME)
+            .withNamespace(DEFAULT_NAMESPACE)
+            .build());
+    return cm;
+  }
+}
diff --git 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/utils/TestUtils.java
 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/utils/TestUtils.java
new file mode 100644
index 0000000..59b3989
--- /dev/null
+++ 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/utils/TestUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.utils;
+
+import static org.apache.spark.k8s.operator.Constants.API_GROUP;
+import static org.apache.spark.k8s.operator.Constants.API_VERSION;
+
+import java.io.File;
+import java.util.Map;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+
+import org.apache.spark.k8s.operator.Constants;
+import org.apache.spark.k8s.operator.SparkApplication;
+
+public class TestUtils {
+  public static SparkApplication createMockApp(String namespace) {
+    SparkApplication cr = new SparkApplication();
+    cr.setKind("SparkApplication");
+    cr.setApiVersion(String.join("/", API_GROUP, API_VERSION));
+    cr.setSpec(cr.initSpec());
+    ObjectMeta meta = new ObjectMeta();
+    meta.setGeneration(0L);
+    meta.setLabels(Map.of(Constants.LABEL_SENTINEL_RESOURCE, "true"));
+    meta.setName("sentinel");
+    meta.setNamespace(namespace);
+    cr.setMetadata(meta);
+    return cr;
+  }
+
+  public static void cleanPropertiesFile(String filePath) {
+    File myObj = new File(filePath);
+    if (!myObj.delete()) {
+      throw new RuntimeException("Failed to clean properties file: " + 
filePath);
+    }
+  }
+
+  public static boolean notTimedOut(long startTime, long maxWaitTimeInMills) {
+    long elapsedTimeInMills = calculateElapsedTimeInMills(startTime);
+    return elapsedTimeInMills < maxWaitTimeInMills;
+  }
+
+  public static long calculateElapsedTimeInMills(long startTime) {
+    return System.currentTimeMillis() - startTime;
+  }
+}
diff --git a/spark-operator/src/test/resources/log4j2.properties 
b/spark-operator/src/test/resources/log4j2.properties
new file mode 100644
index 0000000..53b4e9a
--- /dev/null
+++ b/spark-operator/src/test/resources/log4j2.properties
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+status=info
+strict=true
+dest=out
+name=PropertiesConfig
+property.filename=/tmp/spark-operator
+filter.threshold.type=ThresholdFilter
+filter.threshold.level=warn
+# console
+appender.console.type=Console
+appender.console.name=STDOUT
+appender.console.layout.type=PatternLayout
+appender.console.layout.pattern=%d %p %X %C{1.} [%t] %m%n
+appender.console.filter.threshold.type=ThresholdFilter
+appender.console.filter.threshold.level=info
+# rolling JSON
+appender.rolling.type=RollingFile
+appender.rolling.name=RollingFile
+appender.rolling.append=true
+appender.rolling.fileName=${filename}.log
+appender.rolling.filePattern=${filename}-%i.log.gz
+appender.rolling.layout.type=JsonTemplateLayout
+appender.rolling.layout.eventTemplateUri=classpath:EcsLayout.json
+appender.rolling.policies.type=Policies
+appender.rolling.policies.size.type=SizeBasedTriggeringPolicy
+appender.rolling.policies.size.size=100MB
+appender.rolling.strategy.type=DefaultRolloverStrategy
+appender.rolling.strategy.max=20
+appender.rolling.immediateFlush=true
+# chatty loggers
+rootLogger.level=all
+logger.netty.name=io.netty
+logger.netty.level=warn
+log4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector
+rootLogger.appenderRef.stdout.ref=STDOUT
+rootLogger.appenderRef.rolling.ref=RollingFile


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to