gyfora commented on code in PR #978:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/978#discussion_r2080308653


##########
helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml:
##########
@@ -1,4 +1,4 @@
-# Generated by Fabric8 CRDGenerator, manual edits might get overwritten!
+helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml# 
Generated by Fabric8 CRDGenerator, manual edits might get overwritten!

Review Comment:
   What caused this change?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -654,4 +654,20 @@ public static String operatorConfigKey(String key) {
                     .defaultValue(Duration.ofMinutes(-1))
                     .withDescription(
                             "How often to retrieve Kubernetes cluster resource 
usage information. This information is used to avoid running out of cluster 
resources when scaling up resources. Negative values disable the feature.");
+
+    @Documentation.Section(SECTION_ADVANCED)
+    public static final ConfigOption<Integer> 
OPERATOR_EVENT_EXCEPTION_STACKTRACE_LINES =
+            operatorConfig("event.exception.stacktrace.lines")

Review Comment:
   How about: `events.exceptions.stacktrace-lines`



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -654,4 +654,20 @@ public static String operatorConfigKey(String key) {
                     .defaultValue(Duration.ofMinutes(-1))
                     .withDescription(
                             "How often to retrieve Kubernetes cluster resource 
usage information. This information is used to avoid running out of cluster 
resources when scaling up resources. Negative values disable the feature.");
+
+    @Documentation.Section(SECTION_ADVANCED)
+    public static final ConfigOption<Integer> 
OPERATOR_EVENT_EXCEPTION_STACKTRACE_LINES =
+            operatorConfig("event.exception.stacktrace.lines")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "Maximum number of stack trace lines to include in 
exception-related Kubernetes event messages.");
+
+    @Documentation.Section(SECTION_ADVANCED)
+    public static final ConfigOption<Integer> OPERATOR_EVENT_EXCEPTION_LIMIT =
+            operatorConfig("event.exception.limit.per.reconciliation")

Review Comment:
   How about: `events.exceptions.limit-per-reconciliation`



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java:
##########
@@ -17,22 +17,50 @@
 
 package org.apache.flink.kubernetes.operator.observer.deployment;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.utils.DateTimeUtils;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import org.apache.flink.kubernetes.operator.observer.ClusterHealthObserver;
 import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
 import org.apache.flink.kubernetes.operator.observer.SnapshotObserver;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;
 
 /** The observer of {@link 
org.apache.flink.kubernetes.operator.config.Mode#APPLICATION} cluster. */
 public class ApplicationObserver extends AbstractFlinkDeploymentObserver {
 
+    /** The cache entry for the last recorded exception timestamp. */
+    public static final class ExceptionCacheEntry {
+        final String jobId;
+        final long lastTimestamp;
+
+        ExceptionCacheEntry(String jobId, long lastTimestamp) {
+            this.jobId = jobId;
+            this.lastTimestamp = lastTimestamp;
+        }
+    }
+
+    @VisibleForTesting
+    final Map<String, ExceptionCacheEntry> lastRecordedExceptionCache = new 
ConcurrentHashMap<>();

Review Comment:
   The cache key should be ResourceID (you can call 
`ResourceID.fromResource(resource)`) which will be the namespace + name. Job Id 
is not globally unique, and users can override it arbitrarily. It also doesn't 
really matter as we have only a single job active per resource



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java:
##########
@@ -56,6 +84,149 @@ protected void 
observeFlinkCluster(FlinkResourceContext<FlinkDeployment> ctx) {
         }
     }
 
+    @Override
+    protected void 
observeJobManagerExceptions(FlinkResourceContext<FlinkDeployment> ctx) {

Review Comment:
   I think this logic would be better placed in the `JobStatusObserver` which 
is called by the Application observer and the SessionJob observer. That way it 
will work for both apps and session jobs and you don't have to bother with the 
Session clusters 



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java:
##########
@@ -17,22 +17,50 @@
 
 package org.apache.flink.kubernetes.operator.observer.deployment;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.utils.DateTimeUtils;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import org.apache.flink.kubernetes.operator.observer.ClusterHealthObserver;
 import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
 import org.apache.flink.kubernetes.operator.observer.SnapshotObserver;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;
 
 /** The observer of {@link 
org.apache.flink.kubernetes.operator.config.Mode#APPLICATION} cluster. */
 public class ApplicationObserver extends AbstractFlinkDeploymentObserver {
 
+    /** The cache entry for the last recorded exception timestamp. */
+    public static final class ExceptionCacheEntry {

Review Comment:
   Also one other consideration for the cache is that we need a mechanism to 
remove the resource from it once it is deleted. So the controller cleanup 
method should remove from the cache.
   
   Due to this reason, I think we should not have the cache within the Observer 
module but a nicer place for it would be to put it into the 
`FlinkResourceContextFactory` and the we could expose this extra cached 
information through the `FlinkResourceContext`. If we make the info mutable 
then the observer can update it through the context.
   
   I know this sounds a bit over engineered right now but we actually have some 
other similar logic scattered around the codebase that we could then 
standardize on this approach :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to