gyfora commented on a change in pull request #112:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r841079315
##########
File path:
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
##########
@@ -69,4 +69,10 @@
.defaultValue(Duration.ofSeconds(10))
.withDescription(
"The timeout for the observer to wait the flink
rest client to return.");
+
+ public static final ConfigOption<Duration> OPERATOR_CANCEL_JOB_TIMEOUT =
Review comment:
I think the client timeout can serve the same purpose. So we can
probably remove this config
##########
File path: helm/flink-kubernetes-operator/templates/flink-operator.yaml
##########
@@ -180,4 +186,4 @@ data:
{{- if index (.Values.flinkDefaultConfiguration) "log4j-console.properties" }}
{{- index (.Values.flinkDefaultConfiguration) "log4j-console.properties" |
nindent 4 -}}
{{- end }}
-{{- end }}
+{{- end }}
Review comment:
missing newline at the end
##########
File path:
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.config.DefaultConfig;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+import
io.javaoperatorsdk.operator.processing.event.source.PrimaryResourcesRetriever;
+import
io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Controller that runs the main reconcile loop for {@link FlinkSessionJob}.
*/
+@ControllerConfiguration
+public class FlinkSessionJobController
+ implements
io.javaoperatorsdk.operator.api.reconciler.Reconciler<FlinkSessionJob>,
+ ErrorStatusHandler<FlinkSessionJob>,
+ EventSourceInitializer<FlinkSessionJob> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkSessionJobController.class);
+ private static final String CLUSTER_ID_INDEX = "clusterId_index";
+ private static final String ALL_NAMESPACE = "allNamespace";
+
+ private final KubernetesClient kubernetesClient;
+
+ private final FlinkResourceValidator<FlinkSessionJob> validator;
+ private final Reconciler<FlinkSessionJob> reconciler;
+ private final Observer<FlinkSessionJob> observer;
+ private final DefaultConfig defaultConfig;
+ private final FlinkOperatorConfiguration operatorConfiguration;
+ private Map<String, SharedIndexInformer<FlinkSessionJob>> informers;
+ private FlinkControllerConfig<FlinkSessionJob> controllerConfig;
+
+ public FlinkSessionJobController(
+ DefaultConfig defaultConfig,
+ FlinkOperatorConfiguration operatorConfiguration,
+ KubernetesClient kubernetesClient,
+ FlinkResourceValidator<FlinkSessionJob> validator,
+ Reconciler<FlinkSessionJob> reconciler,
+ Observer<FlinkSessionJob> observer) {
+ this.defaultConfig = defaultConfig;
+ this.operatorConfiguration = operatorConfiguration;
+ this.kubernetesClient = kubernetesClient;
+ this.validator = validator;
+ this.reconciler = reconciler;
+ this.observer = observer;
+ }
+
+ public void init(FlinkControllerConfig<FlinkSessionJob> config) {
+ this.controllerConfig = config;
+ this.informers = createInformers();
+ }
+
+ @Override
+ public UpdateControl<FlinkSessionJob> reconcile(
+ FlinkSessionJob flinkSessionJob, Context context) {
+ FlinkSessionJob originalCopy =
ReconciliationUtils.clone(flinkSessionJob);
+ LOG.info("Starting reconciliation");
+ Optional<String> validationError = validator.validate(flinkSessionJob);
+ if (validationError.isPresent()) {
+ LOG.error("Validation failed: " + validationError.get());
+ ReconciliationUtils.updateForReconciliationError(
+ flinkSessionJob, validationError.get());
+ return ReconciliationUtils.toUpdateControl(originalCopy,
flinkSessionJob);
+ }
Review comment:
For the JobController we have changed the order of validate/observe and
made sure the observer uses the config based on the lastReconcield spec.
Please include this change here as well or at least open ticket to track
this. I think it's okay if we fix it later but we need a ticket to not forget :)
##########
File path:
helm/flink-kubernetes-operator/conf/flink-operator-config/log4j2.properties
##########
@@ -24,3 +24,8 @@ appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan}
%highlight{[%-5level]%notEmpty{[%X{resource.namespace}/}%notEmpty{%X{resource.name}]}
%msg%n%throwable}
+
+loggers = mine
+
+logger.mine.name = org.apache.flink.runtime.rest.RestClient
+logger.mine.level = DEBUG
Review comment:
Should we remove these before merging?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]