This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 33ca85ea4b7ec45a35c554cab0fba562160672b2
Author: Matyas Orhidi <[email protected]>
AuthorDate: Fri Jun 3 11:40:46 2022 +0200

    [FLINK-27892] More than 1 secondary resource related to primary
---
 .../operator/utils/EventSourceUtils.java           | 32 +++++++++++++++++++---
 1 file changed, 28 insertions(+), 4 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
index 2fc26b1..4771723 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
@@ -69,7 +69,11 @@ public class EventSourceUtils {
         context.getPrimaryCache()
                 .addIndexer(
                         FLINK_DEPLOYMENT_IDX,
-                        flinkDeployment -> 
List.of(flinkDeployment.getMetadata().getName()));
+                        flinkDeployment ->
+                                List.of(
+                                        compositeKey(
+                                                
flinkDeployment.getMetadata().getName(),
+                                                
flinkDeployment.getMetadata().getNamespace())));
 
         InformerConfiguration<FlinkSessionJob> configuration =
                 InformerConfiguration.from(FlinkSessionJob.class, context)
@@ -78,7 +82,13 @@ public class EventSourceUtils {
                                         context.getPrimaryCache()
                                                 .byIndex(
                                                         FLINK_DEPLOYMENT_IDX,
-                                                        
sessionJob.getSpec().getDeploymentName())
+                                                        compositeKey(
+                                                                sessionJob
+                                                                        
.getSpec()
+                                                                        
.getDeploymentName(),
+                                                                sessionJob
+                                                                        
.getMetadata()
+                                                                        
.getNamespace()))
                                                 .stream()
                                                 .map(ResourceID::fromResource)
                                                 .collect(Collectors.toSet()))
@@ -94,7 +104,11 @@ public class EventSourceUtils {
         context.getPrimaryCache()
                 .addIndexer(
                         FLINK_SESSIONJOB_IDX,
-                        sessionJob -> 
List.of(sessionJob.getSpec().getDeploymentName()));
+                        sessionJob ->
+                                List.of(
+                                        compositeKey(
+                                                
sessionJob.getSpec().getDeploymentName(),
+                                                
sessionJob.getMetadata().getNamespace())));
 
         InformerConfiguration<FlinkDeployment> configuration =
                 InformerConfiguration.from(FlinkDeployment.class, context)
@@ -103,7 +117,13 @@ public class EventSourceUtils {
                                         context.getPrimaryCache()
                                                 .byIndex(
                                                         FLINK_SESSIONJOB_IDX,
-                                                        
flinkDeployment.getMetadata().getName())
+                                                        compositeKey(
+                                                                flinkDeployment
+                                                                        
.getMetadata()
+                                                                        
.getName(),
+                                                                flinkDeployment
+                                                                        
.getMetadata()
+                                                                        
.getNamespace()))
                                                 .stream()
                                                 .map(ResourceID::fromResource)
                                                 .collect(Collectors.toSet()))
@@ -113,4 +133,8 @@ public class EventSourceUtils {
 
         return new InformerEventSource<>(configuration, context);
     }
+
+    private static String compositeKey(String name, String namespace) {
+        return String.format("%s_%s", name, namespace);
+    }
 }

Reply via email to