gyfora commented on code in PR #978: URL: https://github.com/apache/flink-kubernetes-operator/pull/978#discussion_r2080308653
########## helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml: ########## @@ -1,4 +1,4 @@ -# Generated by Fabric8 CRDGenerator, manual edits might get overwritten! +helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml# Generated by Fabric8 CRDGenerator, manual edits might get overwritten! Review Comment: What caused this change? ########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java: ########## @@ -654,4 +654,20 @@ public static String operatorConfigKey(String key) { .defaultValue(Duration.ofMinutes(-1)) .withDescription( "How often to retrieve Kubernetes cluster resource usage information. This information is used to avoid running out of cluster resources when scaling up resources. Negative values disable the feature."); + + @Documentation.Section(SECTION_ADVANCED) + public static final ConfigOption<Integer> OPERATOR_EVENT_EXCEPTION_STACKTRACE_LINES = + operatorConfig("event.exception.stacktrace.lines") Review Comment: How about: `events.exceptions.stacktrace-lines` ########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java: ########## @@ -654,4 +654,20 @@ public static String operatorConfigKey(String key) { .defaultValue(Duration.ofMinutes(-1)) .withDescription( "How often to retrieve Kubernetes cluster resource usage information. This information is used to avoid running out of cluster resources when scaling up resources. Negative values disable the feature."); + + @Documentation.Section(SECTION_ADVANCED) + public static final ConfigOption<Integer> OPERATOR_EVENT_EXCEPTION_STACKTRACE_LINES = + operatorConfig("event.exception.stacktrace.lines") + .intType() + .defaultValue(5) + .withDescription( + "Maximum number of stack trace lines to include in exception-related Kubernetes event messages."); + + @Documentation.Section(SECTION_ADVANCED) + public static final ConfigOption<Integer> OPERATOR_EVENT_EXCEPTION_LIMIT = + operatorConfig("event.exception.limit.per.reconciliation") Review Comment: How about: `events.exceptions.limit-per-reconciliation` ########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java: ########## @@ -17,22 +17,50 @@ package org.apache.flink.kubernetes.operator.observer.deployment; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.autoscaler.utils.DateTimeUtils; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.observer.ClusterHealthObserver; import org.apache.flink.kubernetes.operator.observer.JobStatusObserver; import org.apache.flink.kubernetes.operator.observer.SnapshotObserver; import org.apache.flink.kubernetes.operator.utils.EventRecorder; +import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.time.ZoneId; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED; /** The observer of {@link org.apache.flink.kubernetes.operator.config.Mode#APPLICATION} cluster. */ public class ApplicationObserver extends AbstractFlinkDeploymentObserver { + /** The cache entry for the last recorded exception timestamp. */ + public static final class ExceptionCacheEntry { + final String jobId; + final long lastTimestamp; + + ExceptionCacheEntry(String jobId, long lastTimestamp) { + this.jobId = jobId; + this.lastTimestamp = lastTimestamp; + } + } + + @VisibleForTesting + final Map<String, ExceptionCacheEntry> lastRecordedExceptionCache = new ConcurrentHashMap<>(); Review Comment: The cache key should be ResourceID (you can call `ResourceID.fromResource(resource)`) which will be the namespace + name. Job Id is not globally unique, and users can override it arbitrarily. It also doesn't really matter as we have only a single job active per resource ########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java: ########## @@ -56,6 +84,149 @@ protected void observeFlinkCluster(FlinkResourceContext<FlinkDeployment> ctx) { } } + @Override + protected void observeJobManagerExceptions(FlinkResourceContext<FlinkDeployment> ctx) { Review Comment: I think this logic would be better placed in the `JobStatusObserver` which is called by the Application observer and the SessionJob observer. That way it will work for both apps and session jobs and you don't have to bother with the Session clusters ########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java: ########## @@ -17,22 +17,50 @@ package org.apache.flink.kubernetes.operator.observer.deployment; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.autoscaler.utils.DateTimeUtils; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.observer.ClusterHealthObserver; import org.apache.flink.kubernetes.operator.observer.JobStatusObserver; import org.apache.flink.kubernetes.operator.observer.SnapshotObserver; import org.apache.flink.kubernetes.operator.utils.EventRecorder; +import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.time.ZoneId; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED; /** The observer of {@link org.apache.flink.kubernetes.operator.config.Mode#APPLICATION} cluster. */ public class ApplicationObserver extends AbstractFlinkDeploymentObserver { + /** The cache entry for the last recorded exception timestamp. */ + public static final class ExceptionCacheEntry { Review Comment: Also one other consideration for the cache is that we need a mechanism to remove the resource from it once it is deleted. So the controller cleanup method should remove from the cache. Due to this reason, I think we should not have the cache within the Observer module but a nicer place for it would be to put it into the `FlinkResourceContextFactory` and the we could expose this extra cached information through the `FlinkResourceContext`. If we make the info mutable then the observer can update it through the context. I know this sounds a bit over engineered right now but we actually have some other similar logic scattered around the codebase that we could then standardize on this approach :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org