This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 33ca85ea4b7ec45a35c554cab0fba562160672b2 Author: Matyas Orhidi <[email protected]> AuthorDate: Fri Jun 3 11:40:46 2022 +0200 [FLINK-27892] More than 1 secondary resource related to primary --- .../operator/utils/EventSourceUtils.java | 32 +++++++++++++++++++--- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java index 2fc26b1..4771723 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java @@ -69,7 +69,11 @@ public class EventSourceUtils { context.getPrimaryCache() .addIndexer( FLINK_DEPLOYMENT_IDX, - flinkDeployment -> List.of(flinkDeployment.getMetadata().getName())); + flinkDeployment -> + List.of( + compositeKey( + flinkDeployment.getMetadata().getName(), + flinkDeployment.getMetadata().getNamespace()))); InformerConfiguration<FlinkSessionJob> configuration = InformerConfiguration.from(FlinkSessionJob.class, context) @@ -78,7 +82,13 @@ public class EventSourceUtils { context.getPrimaryCache() .byIndex( FLINK_DEPLOYMENT_IDX, - sessionJob.getSpec().getDeploymentName()) + compositeKey( + sessionJob + .getSpec() + .getDeploymentName(), + sessionJob + .getMetadata() + .getNamespace())) .stream() .map(ResourceID::fromResource) .collect(Collectors.toSet())) @@ -94,7 +104,11 @@ public class EventSourceUtils { context.getPrimaryCache() .addIndexer( FLINK_SESSIONJOB_IDX, - sessionJob -> List.of(sessionJob.getSpec().getDeploymentName())); + sessionJob -> + List.of( + compositeKey( + sessionJob.getSpec().getDeploymentName(), + sessionJob.getMetadata().getNamespace()))); InformerConfiguration<FlinkDeployment> configuration = InformerConfiguration.from(FlinkDeployment.class, context) @@ -103,7 +117,13 @@ public class EventSourceUtils { context.getPrimaryCache() .byIndex( FLINK_SESSIONJOB_IDX, - flinkDeployment.getMetadata().getName()) + compositeKey( + flinkDeployment + .getMetadata() + .getName(), + flinkDeployment + .getMetadata() + .getNamespace())) .stream() .map(ResourceID::fromResource) .collect(Collectors.toSet())) @@ -113,4 +133,8 @@ public class EventSourceUtils { return new InformerEventSource<>(configuration, context); } + + private static String compositeKey(String name, String namespace) { + return String.format("%s_%s", name, namespace); + } }
