morhidi commented on code in PR #268:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/268#discussion_r896644897


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java:
##########
@@ -70,37 +70,36 @@ public class FlinkOperator {
     private final FlinkConfigManager configManager;
     private final Set<FlinkResourceValidator> validators;
     private final MetricGroup metricGroup;
+    private final Metrics metrics;
+
+    private static final String OPERATOR_SDK_GROUP = "operator.sdk";
 
     public FlinkOperator(@Nullable Configuration conf) {
         this.client = new DefaultKubernetesClient();
         this.configManager = conf != null ? new FlinkConfigManager(conf) : new 
FlinkConfigManager();
-        this.operator =
-                new Operator(
-                        client,
-                        getConfigurationServiceOverriderConsumer(
-                                configManager.getOperatorConfiguration()));
-        this.flinkService = new FlinkService(client, configManager);
-        this.validators = ValidatorUtils.discoverValidators(configManager);
         this.metricGroup =
                 
OperatorMetricUtils.initOperatorMetrics(configManager.getDefaultConfig());
+        this.metrics = new 
FlinkOperatorMetrics(metricGroup.addGroup(OPERATOR_SDK_GROUP));
+        this.operator = new Operator(client, 
getConfigurationServiceOverriderConsumer());
+        this.flinkService = new FlinkService(client, configManager);
+        this.validators = ValidatorUtils.discoverValidators(configManager);
         PluginManager pluginManager =
                 
PluginUtils.createPluginManagerFromRootFolder(configManager.getDefaultConfig());
         FileSystem.initialize(configManager.getDefaultConfig(), pluginManager);
     }
 
-    @VisibleForTesting
-    protected static Consumer<ConfigurationServiceOverrider>
-            getConfigurationServiceOverriderConsumer(
-                    FlinkOperatorConfiguration operatorConfiguration) {
+    private Consumer<ConfigurationServiceOverrider> 
getConfigurationServiceOverriderConsumer() {
         return overrider -> {
-            int parallelism = 
operatorConfiguration.getReconcilerMaxParallelism();
+            int parallelism =
+                    
configManager.getOperatorConfiguration().getReconcilerMaxParallelism();
             if (parallelism == -1) {
                 LOG.info("Configuring operator with unbounded reconciliation 
thread pool.");
                 overrider.withExecutorService(Executors.newCachedThreadPool());
             } else {
                 LOG.info("Configuring operator with {} reconciliation 
threads.", parallelism);
                 overrider.withConcurrentReconciliationThreads(parallelism);
             }
+            overrider.withMetrics(metrics);

Review Comment:
   I added the following to the code, and nothing else:
   
   ```
   overrider.withMetrics(new Metrics() {
                   @Override
                   public void receivedEvent(Event event) {
                       LOG.info("receivedEvent() called");
                   }
   
                   @Override
                   public void reconcileCustomResource(ResourceID resourceID, 
RetryInfo retryInfo) {
                       LOG.info("reconcileCustomResource() called");
                   }
   
                   @Override
                   public void failedReconciliation(ResourceID resourceID, 
Exception exception) {
                       LOG.info("failedReconciliation() called");
                   }
   
                   @Override
                   public void cleanupDoneFor(ResourceID resourceID) {
                       LOG.info("cleanupDoneFor() called");
                   }
   
                   @Override
                   public void finishedReconciliation(ResourceID resourceID) {
                       LOG.info("finishedReconciliation() called");
                   }
   
               });
   ```
   
   and I see it in the logs, it is being called:
   
   ```
   2022-06-14 12:16:07,480 o.a.f.k.o.FlinkOperator        [INFO ] 
[default.basic-session-cluster] receivedEvent() called
   2022-06-14 12:16:07,481 o.a.f.k.o.FlinkOperator        [INFO ] 
[default.basic-session-job-example] receivedEvent() called
   2022-06-14 12:16:07,592 o.a.f.k.o.FlinkOperator        [INFO ] 
[default.basic-session-job-example] reconcileCustomResource() called
   2022-06-14 12:16:07,593 o.a.f.k.o.FlinkOperator        [INFO ] 
[default.basic-session-job-example] receivedEvent() called
   2022-06-14 12:16:07,605 o.a.f.k.o.FlinkOperator        [INFO ] 
[default.basic-session-job-example2] receivedEvent() called
   2022-06-14 12:16:07,607 o.a.f.k.o.FlinkOperator        [INFO ] 
[default.basic-session-job-example2] reconcileCustomResource() called
   2022-06-14 12:16:07,608 o.a.f.k.o.FlinkOperator        [INFO ] 
[default.basic-session-job-example2] receivedEvent() called
   2022-06-14 12:16:07,622 o.a.f.k.o.FlinkOperator        [INFO ] 
[default.basic-session-job-example] receivedEvent() called
   2022-06-14 12:16:07,662 o.a.f.k.o.FlinkOperator        [INFO ] 
[default.basic-session-job-example] receivedEvent() called
   2022-06-14 12:16:07,686 o.a.f.k.o.FlinkOperator        [INFO ] 
[default.basic-session-job-example2] receivedEvent() called
   2022-06-14 12:16:07,778 o.a.f.k.o.FlinkOperator        [INFO ] 
[default.basic-session-job-example2] receivedEvent() called
   2022-06-14 12:16:07,799 o.a.f.k.o.FlinkOperator        [INFO ] 
[default.basic-session-job-example] receivedEvent() called
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to