Aitozi commented on code in PR #239:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/239#discussion_r882726664
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -63,7 +63,7 @@ public class FlinkConfigManager {
private final AtomicLong defaultConfigVersion = new AtomicLong(0);
private final LoadingCache<Key, Configuration> cache;
- private Set<String> namespaces = OperatorUtils.getWatchedNamespaces();
+ private final Set<String> namespaces = EnvUtils.getWatchedNamespaces();
Review Comment:
Just to confirm: the dynamic namespace have not enabled right? Do we have
to create a follow-up ticket for it.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java:
##########
@@ -131,90 +116,30 @@ public DeleteControl cleanup(FlinkSessionJob sessionJob,
Context context) {
}
@Override
- public Optional<FlinkSessionJob> updateErrorStatus(
- FlinkSessionJob flinkSessionJob, RetryInfo retryInfo,
RuntimeException e) {
- return ReconciliationUtils.updateErrorStatus(
- flinkSessionJob, retryInfo, e, metricManager, statusHelper);
+ public ErrorStatusUpdateControl<FlinkSessionJob> updateErrorStatus(
+ FlinkSessionJob sessionJob, Context<FlinkSessionJob> context,
Exception e) {
+ return ReconciliationUtils.toErrorStatusUpdateControl(
+ sessionJob, context.getRetryInfo(), e, metricManager,
statusHelper);
}
@Override
- public List<EventSource> prepareEventSources(
- EventSourceContext<FlinkSessionJob> eventSourceContext) {
- if (effectiveNamespaces.isEmpty()) {
- return List.of(
- createFlinkDepInformerEventSource(
-
kubernetesClient.resources(FlinkDeployment.class).inAnyNamespace(),
- OperatorUtils.ALL_NAMESPACE));
- } else {
- return effectiveNamespaces.stream()
- .map(
- name ->
- createFlinkDepInformerEventSource(
- kubernetesClient
-
.resources(FlinkDeployment.class)
- .inNamespace(name),
- name))
- .collect(Collectors.toList());
- }
- }
-
- private InformerEventSource<FlinkDeployment, FlinkSessionJob>
createFlinkDepInformerEventSource(
- FilterWatchListDeletable<FlinkDeployment,
KubernetesResourceList<FlinkDeployment>>
- filteredClient,
- String name) {
- return new InformerEventSource<>(
- filteredClient.runnableInformer(0),
- primaryResourceRetriever(),
- sessionJob ->
- new ResourceID(
- sessionJob.getSpec().getDeploymentName(),
- sessionJob.getMetadata().getNamespace()),
- false) {
- @Override
- public String name() {
- return name;
- }
- };
+ public Map<String, EventSource> prepareEventSources(
+ EventSourceContext<FlinkSessionJob> context) {
+ return EventSourceInitializer.nameEventSources(
+
EventSourceUtils.getFlinkDeploymentInformerEventSource(context));
}
/**
* Mapping the {@link FlinkDeployment} session cluster to {@link
FlinkSessionJob}. It leverages
* the informer indexer.
*
- * @return The {@link PrimaryResourcesRetriever}.
+ * @return {@link SecondaryToPrimaryMapper}.
Review Comment:
This comment is outdated
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/informer/InformerManager.java:
##########
@@ -39,21 +43,24 @@ public class InformerManager {
private final KubernetesClient kubernetesClient;
private volatile Map<String, SharedIndexInformer<FlinkSessionJob>>
sessionJobInformers;
private volatile Map<String, SharedIndexInformer<FlinkDeployment>>
flinkDepInformers;
+ public static final String CLUSTER_ID_INDEX = "clusterId_index";
Review Comment:
the static variable should put above the non-static I think
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -199,11 +199,7 @@ protected void observeJmDeployment(
}
private Optional<Deployment> getSecondaryResource(FlinkDeployment
flinkApp, Context context) {
- return context.getSecondaryResource(
- Deployment.class,
-
configManager.getOperatorConfiguration().getWatchedNamespaces().size() > 1
- ? flinkApp.getMetadata().getNamespace()
- : null);
+ return context.getSecondaryResource(Deployment.class);
Review Comment:
unused `flinkApp`, since it is a one line method call, do we still need a
method ?
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/informer/InformerManager.java:
##########
@@ -39,21 +43,24 @@ public class InformerManager {
private final KubernetesClient kubernetesClient;
private volatile Map<String, SharedIndexInformer<FlinkSessionJob>>
sessionJobInformers;
private volatile Map<String, SharedIndexInformer<FlinkDeployment>>
flinkDepInformers;
+ public static final String CLUSTER_ID_INDEX = "clusterId_index";
public InformerManager(Set<String> watchedNamespaces, KubernetesClient
kubernetesClient) {
this.watchedNamespaces = watchedNamespaces;
this.kubernetesClient = kubernetesClient;
LOG.info(
"Created informer manager with watchedNamespaces: {}",
watchedNamespaces.isEmpty()
- ? "[" + OperatorUtils.ALL_NAMESPACE + "]"
+ ? "[" + Constants.WATCH_ALL_NAMESPACES + "]"
: watchedNamespaces);
}
public SharedIndexInformer<FlinkSessionJob> getSessionJobInformer(String
namespace) {
Review Comment:
It seems not used anymore , do we need to remove this ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]