gyfora commented on code in PR #733:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1435156526


##########
flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java:
##########
@@ -17,43 +17,88 @@
 
 package org.apache.flink.kubernetes.operator.admission.mutator;
 
+import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
 import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.client.informers.cache.Cache;
 import io.javaoperatorsdk.webhook.admission.NotAllowedException;
 import io.javaoperatorsdk.webhook.admission.Operation;
 import io.javaoperatorsdk.webhook.admission.mutation.Mutator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.Optional;
+import java.util.Set;
 
 /** The default mutator. */
 public class FlinkMutator implements Mutator<HasMetadata> {
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkMutator.class);
     private static final ObjectMapper mapper = new ObjectMapper();
+    private final Set<FlinkResourceMutator> mutators;
+    private final InformerManager informerManager;
+
+    public FlinkMutator(Set<FlinkResourceMutator> mutators, InformerManager 
informerManager) {
+        this.mutators = mutators;
+        this.informerManager = informerManager;
+    }
 
     @Override
     public HasMetadata mutate(HasMetadata resource, Operation operation)
             throws NotAllowedException {
-        if (operation == Operation.CREATE) {
+        if (operation == Operation.CREATE || operation == Operation.UPDATE) {
             LOG.debug("Mutating resource {}", resource);
-
             if (CrdConstants.KIND_SESSION_JOB.equals(resource.getKind())) {
-                try {
-                    var sessionJob = mapper.convertValue(resource, 
FlinkSessionJob.class);
-                    setSessionTargetLabel(sessionJob);
-                    return sessionJob;
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
+                return mutateSessionJob(resource);
+            }
+            if (CrdConstants.KIND_FLINK_DEPLOYMENT.equals(resource.getKind())) 
{
+                return mutateDeployment(resource);
             }
         }
         return resource;
     }
 
+    private FlinkSessionJob mutateSessionJob(HasMetadata resource) {
+        try {
+            var sessionJob = mapper.convertValue(resource, 
FlinkSessionJob.class);
+            var namespace = sessionJob.getMetadata().getNamespace();
+            var deploymentName = sessionJob.getSpec().getDeploymentName();
+            var key = Cache.namespaceKeyFunc(namespace, deploymentName);
+            var deployment =
+                    
informerManager.getFlinkDepInformer(namespace).getStore().getByKey(key);
+
+            setSessionTargetLabel(sessionJob);

Review Comment:
   I mean `setSessionTargetLabel(sessionJob);`  should be in the default mutator



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to