This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 3c60c3c4 [FLINK-37730] Improve exception recording ts initialization + 
2.0 compatibility
3c60c3c4 is described below

commit 3c60c3c45e33e16f7c1a26a53b634914f6ca863c
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Tue May 27 10:34:31 2025 +0200

    [FLINK-37730] Improve exception recording ts initialization + 2.0 
compatibility
---
 .gitignore                                         |   2 +
 .../operator/observer/JobStatusObserver.java       | 105 +++---
 .../service/FlinkResourceContextFactory.java       |  20 +-
 .../kubernetes/operator/utils/EventUtils.java      |  26 +-
 .../messages/JobExceptionsInfoWithHistory.java     | 367 +++++++++++++++++++++
 .../kubernetes/operator/TestingFlinkService.java   |   4 -
 .../operator/observer/JobStatusObserverTest.java   |  63 +++-
 ...ErrorTest.java => EventUtilsApiServerTest.java} |  56 +++-
 8 files changed, 568 insertions(+), 75 deletions(-)

diff --git a/.gitignore b/.gitignore
index bb85f4bb..2db0ebc1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -36,3 +36,5 @@ buildNumber.properties
 .idea
 *.iml
 *.DS_Store
+
+.kube
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index caf4a32a..0c875613 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.EventUtils;
 import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
 import org.apache.flink.kubernetes.operator.utils.K8sAnnotationsSanitizer;
 import org.apache.flink.runtime.client.JobStatusMessage;
@@ -35,12 +36,12 @@ import 
org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
 import java.time.Instant;
-import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeoutException;
@@ -53,6 +54,8 @@ public class JobStatusObserver<R extends 
AbstractFlinkResource<?, ?>> {
     private static final Logger LOG = 
LoggerFactory.getLogger(JobStatusObserver.class);
 
     public static final String JOB_NOT_FOUND_ERR = "Job Not Found";
+    public static final String EXCEPTION_TIMESTAMP = "exception-timestamp";
+    public static final Duration MAX_K8S_EVENT_AGE = Duration.ofMinutes(30);
 
     protected final EventRecorder eventRecorder;
 
@@ -132,65 +135,77 @@ public class JobStatusObserver<R extends 
AbstractFlinkResource<?, ?>> {
             }
 
             var exceptionHistory = history.getExceptionHistory();
-            List<JobExceptionsInfoWithHistory.RootExceptionInfo> exceptions =
-                    exceptionHistory.getEntries();
-            if (exceptions == null || exceptions.isEmpty()) {
-                return;
+            var exceptions = exceptionHistory.getEntries();
+            if (exceptions != null) {
+                exceptions = new ArrayList<>(exceptions);
+                exceptions.sort(
+                        Comparator.comparingLong(
+                                        
JobExceptionsInfoWithHistory.RootExceptionInfo
+                                                ::getTimestamp)
+                                .reversed());
+            } else {
+                exceptions = Collections.emptyList();
             }
 
-            if (exceptionHistory.isTruncated()) {
-                LOG.warn(
-                        "Job exception history is truncated for jobId '{}'. 
Some exceptions may be missing.",
-                        jobId);
+            String currentJobId = jobStatus.getJobId();
+            var cacheEntry = ctx.getExceptionCacheEntry();
+
+            if (!cacheEntry.isInitialized()) {
+                Instant lastExceptionTs;
+                if (exceptions.isEmpty()) {
+                    // If the job doesn't have any exceptions set to MIN as we 
always have to record
+                    // the next
+                    lastExceptionTs = Instant.MIN;
+                } else {
+                    var k8sExpirationTs = 
Instant.now().minus(MAX_K8S_EVENT_AGE);
+                    var maxJobExceptionTs = 
Instant.ofEpochMilli(exceptions.get(0).getTimestamp());
+                    if (maxJobExceptionTs.isBefore(k8sExpirationTs)) {
+                        // If the last job exception was a long time ago, then 
there is no point in
+                        // checking in k8s. We won't report this as exception
+                        lastExceptionTs = maxJobExceptionTs;
+                    } else {
+                        // If there were recent exceptions, we check the 
triggered events from kube
+                        // to make sure we don't double trigger
+                        lastExceptionTs =
+                                EventUtils.findLastJobExceptionTsFromK8s(
+                                                ctx.getKubernetesClient(), 
resource)
+                                        .orElse(k8sExpirationTs);
+                    }
+                }
+
+                cacheEntry.setLastTimestamp(lastExceptionTs);
+                cacheEntry.setInitialized(true);
+                cacheEntry.setJobId(currentJobId);
             }
 
-            String currentJobId = jobStatus.getJobId();
-            Instant lastRecorded = null; // first reconciliation
+            var lastRecorded =
+                    currentJobId.equals(cacheEntry.getJobId())
+                            ? cacheEntry.getLastTimestamp()
+                            : Instant.MIN;
 
-            var cacheEntry = ctx.getExceptionCacheEntry();
-            // a cache entry is created should always be present. The 
timestamp for the first
-            // reconciliation would be
-            // when the job was created. This check is still necessary because 
even though there
-            // might be an entry,
-            // the jobId could have changed since the job was first created.
-            if (cacheEntry.getJobId() != null && 
cacheEntry.getJobId().equals(currentJobId)) {
-                lastRecorded = 
Instant.ofEpochMilli(cacheEntry.getLastTimestamp());
+            if (exceptions.isEmpty()) {
+                return;
             }
 
             int maxEvents = 
operatorConfig.getReportedExceptionEventsMaxCount();
             int maxStackTraceLines = 
operatorConfig.getReportedExceptionEventsMaxStackTraceLength();
 
-            // Sort and reverse to prioritize the newest exceptions
-            var sortedExceptions = new ArrayList<>(exceptions);
-            sortedExceptions.sort(
-                    Comparator.comparingLong(
-                                    
JobExceptionsInfoWithHistory.RootExceptionInfo::getTimestamp)
-                            .reversed());
             int count = 0;
-            Instant latestSeen = null;
-
-            for (var exception : sortedExceptions) {
-                Instant exceptionTime = 
Instant.ofEpochMilli(exception.getTimestamp());
-                // Skip already recorded exceptions
-                if (lastRecorded != null && 
!exceptionTime.isAfter(lastRecorded)) {
+            for (var exception : exceptions) {
+                var exceptionTime = 
Instant.ofEpochMilli(exception.getTimestamp());
+                // Skip already recorded exceptions and after max count
+                if (!exceptionTime.isAfter(lastRecorded) || count++ >= 
maxEvents) {
                     break;
                 }
                 emitJobManagerExceptionEvent(ctx, exception, exceptionTime, 
maxStackTraceLines);
-                if (latestSeen == null) {
-                    latestSeen = exceptionTime;
-                }
-                if (++count >= maxEvents) {
-                    break;
-                }
             }
 
-            ctx.getExceptionCacheEntry().setJobId(currentJobId);
-            // Set to the timestamp of the latest emitted exception, if any 
were emitted
-            // the other option is that if no exceptions were emitted, we set 
this to now.
-            if (latestSeen != null) {
-                
ctx.getExceptionCacheEntry().setLastTimestamp(latestSeen.toEpochMilli());
+            if (count > maxEvents) {
+                LOG.warn("Job exception history is truncated. Some exceptions 
may be missing.");
             }
 
+            cacheEntry.setJobId(currentJobId);
+            
cacheEntry.setLastTimestamp(Instant.ofEpochMilli(exceptions.get(0).getTimestamp()));
         } catch (Exception e) {
             LOG.warn("Failed to fetch JobManager exception info.", e);
         }
@@ -203,9 +218,7 @@ public class JobStatusObserver<R extends 
AbstractFlinkResource<?, ?>> {
             int maxStackTraceLines) {
         Map<String, String> annotations = new HashMap<>();
         if (exceptionTime != null) {
-            annotations.put(
-                    "exception-timestamp",
-                    
exceptionTime.atZone(ZoneId.systemDefault()).toOffsetDateTime().toString());
+            annotations.put(EXCEPTION_TIMESTAMP, exceptionTime.toString());
         }
         if (exception.getTaskName() != null) {
             annotations.put("task-name", exception.getTaskName());
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
index 8228ffbf..f2f75a52 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
@@ -43,6 +43,7 @@ import lombok.Data;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Instant;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -57,12 +58,8 @@ public class FlinkResourceContextFactory {
     @Data
     public static final class ExceptionCacheEntry {
         private String jobId;
-        private long lastTimestamp;
-
-        public ExceptionCacheEntry(String jobId, long lastTimestamp) {
-            this.jobId = jobId;
-            this.lastTimestamp = lastTimestamp;
-        }
+        private Instant lastTimestamp;
+        private boolean initialized;
     }
 
     @VisibleForTesting
@@ -128,10 +125,7 @@ public class FlinkResourceContextFactory {
                             configManager,
                             this::getFlinkService,
                             lastRecordedExceptionCache.computeIfAbsent(
-                                    resourceId,
-                                    id ->
-                                            new ExceptionCacheEntry(
-                                                    flinkDepJobId, 
System.currentTimeMillis())));
+                                    resourceId, id -> new 
ExceptionCacheEntry()));
         } else if (resource instanceof FlinkSessionJob) {
             var resourceId = ResourceID.fromResource(resource);
             var flinkSessionJobId = jobId;
@@ -143,11 +137,7 @@ public class FlinkResourceContextFactory {
                             configManager,
                             this::getFlinkService,
                             lastRecordedExceptionCache.computeIfAbsent(
-                                    resourceId,
-                                    id ->
-                                            new ExceptionCacheEntry(
-                                                    flinkSessionJobId,
-                                                    
System.currentTimeMillis())));
+                                    resourceId, id -> new 
ExceptionCacheEntry()));
         } else {
             throw new IllegalArgumentException(
                     "Unknown resource type " + 
resource.getClass().getSimpleName());
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
index df6232d4..faac4829 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.utils;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
 
 import io.fabric8.kubernetes.api.model.Event;
 import io.fabric8.kubernetes.api.model.EventBuilder;
@@ -38,6 +39,7 @@ import java.net.HttpURLConnection;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -283,13 +285,13 @@ public class EventUtils {
         return Optional.empty();
     }
 
-    private static List<Event> getPodEvents(KubernetesClient client, Pod pod) {
-        var ref = getObjectReference(pod);
+    private static List<Event> getResourceEvents(KubernetesClient client, 
HasMetadata cr) {
+        var ref = getObjectReference(cr);
 
         var eventList =
                 client.v1()
                         .events()
-                        .inNamespace(pod.getMetadata().getNamespace())
+                        .inNamespace(cr.getMetadata().getNamespace())
                         .withInvolvedObject(ref)
                         .list();
 
@@ -343,7 +345,7 @@ public class EventUtils {
         boolean notReady = checkStatusWasAlways(pod, 
conditionMap.get("Ready"), "False");
 
         if (notReady && failedInitialization) {
-            getPodEvents(client, pod).stream()
+            getResourceEvents(client, pod).stream()
                     .filter(e -> e.getReason().equals("FailedMount"))
                     .findAny()
                     .ifPresent(
@@ -356,4 +358,20 @@ public class EventUtils {
     private static boolean checkStatusWasAlways(Pod pod, PodCondition 
condition, String status) {
         return condition != null && condition.getStatus().equals(status);
     }
+
+    public static Optional<Instant> findLastJobExceptionTsFromK8s(
+            KubernetesClient client, HasMetadata cr) {
+        var events = getResourceEvents(client, cr);
+        return events.stream()
+                .filter(e -> 
EventRecorder.Reason.JobException.name().equals(e.getReason()))
+                .map(
+                        e ->
+                                Instant.parse(
+                                        e.getMetadata()
+                                                .getAnnotations()
+                                                .getOrDefault(
+                                                        
JobStatusObserver.EXCEPTION_TIMESTAMP,
+                                                        
e.getMetadata().getCreationTimestamp())))
+                .max(Comparator.naturalOrder());
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
new file mode 100644
index 00000000..7cc17ded
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.StringJoiner;
+
+import static 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Copied from Flink 2.0 to handle removed changes. */
+public class JobExceptionsInfoWithHistory implements ResponseBody {
+
+    public static final String FIELD_NAME_EXCEPTION_HISTORY = 
"exceptionHistory";
+
+    @JsonProperty(FIELD_NAME_EXCEPTION_HISTORY)
+    private final JobExceptionHistory exceptionHistory;
+
+    @JsonCreator
+    public JobExceptionsInfoWithHistory(
+            @JsonProperty(FIELD_NAME_EXCEPTION_HISTORY) JobExceptionHistory 
exceptionHistory) {
+        this.exceptionHistory = exceptionHistory;
+    }
+
+    @JsonIgnore
+    public JobExceptionHistory getExceptionHistory() {
+        return exceptionHistory;
+    }
+
+    // hashCode and equals are necessary for the test classes deriving from
+    // RestResponseMarshallingTestBase
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        JobExceptionsInfoWithHistory that = (JobExceptionsInfoWithHistory) o;
+        return Objects.equals(exceptionHistory, that.exceptionHistory);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(exceptionHistory);
+    }
+
+    @Override
+    public String toString() {
+        return new StringJoiner(", ", 
JobExceptionsInfoWithHistory.class.getSimpleName() + "[", "]")
+                .add("exceptionHistory=" + exceptionHistory)
+                .toString();
+    }
+
+    /** {@code JobExceptionHistory} collects all previously caught errors. */
+    public static final class JobExceptionHistory {
+
+        public static final String FIELD_NAME_ENTRIES = "entries";
+        public static final String FIELD_NAME_TRUNCATED = "truncated";
+
+        @JsonProperty(FIELD_NAME_ENTRIES)
+        private final List<RootExceptionInfo> entries;
+
+        @JsonProperty(FIELD_NAME_TRUNCATED)
+        private final boolean truncated;
+
+        @JsonCreator
+        public JobExceptionHistory(
+                @JsonProperty(FIELD_NAME_ENTRIES) List<RootExceptionInfo> 
entries,
+                @JsonProperty(FIELD_NAME_TRUNCATED) boolean truncated) {
+            this.entries = entries;
+            this.truncated = truncated;
+        }
+
+        @JsonIgnore
+        public List<RootExceptionInfo> getEntries() {
+            return entries;
+        }
+
+        @JsonIgnore
+        public boolean isTruncated() {
+            return truncated;
+        }
+
+        // hashCode and equals are necessary for the test classes deriving from
+        // RestResponseMarshallingTestBase
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            JobExceptionHistory that = (JobExceptionHistory) o;
+            return this.isTruncated() == that.isTruncated()
+                    && Objects.equals(entries, that.entries);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(entries, truncated);
+        }
+
+        @Override
+        public String toString() {
+            return new StringJoiner(", ", 
JobExceptionHistory.class.getSimpleName() + "[", "]")
+                    .add("entries=" + entries)
+                    .add("truncated=" + truncated)
+                    .toString();
+        }
+    }
+
+    /**
+     * Json equivalent of {@link
+     * 
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry}.
+     */
+    public static class ExceptionInfo {
+
+        public static final String FIELD_NAME_EXCEPTION_NAME = "exceptionName";
+        public static final String FIELD_NAME_EXCEPTION_STACKTRACE = 
"stacktrace";
+        public static final String FIELD_NAME_EXCEPTION_TIMESTAMP = 
"timestamp";
+        public static final String FIELD_NAME_TASK_NAME = "taskName";
+        public static final String FIELD_NAME_ENDPOINT = "endpoint";
+        public static final String FIELD_NAME_TASK_MANAGER_ID = 
"taskManagerId";
+        public static final String FIELD_NAME_FAILURE_LABELS = "failureLabels";
+
+        @JsonProperty(FIELD_NAME_EXCEPTION_NAME)
+        private final String exceptionName;
+
+        @JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE)
+        private final String stacktrace;
+
+        @JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP)
+        private final long timestamp;
+
+        @JsonInclude(NON_NULL)
+        @JsonProperty(FIELD_NAME_TASK_NAME)
+        @Nullable
+        private final String taskName;
+
+        @JsonInclude(NON_NULL)
+        @JsonProperty(FIELD_NAME_ENDPOINT)
+        @Nullable
+        private final String endpoint;
+
+        @JsonInclude(NON_NULL)
+        @JsonProperty(FIELD_NAME_TASK_MANAGER_ID)
+        @Nullable
+        private final String taskManagerId;
+
+        @JsonProperty(FIELD_NAME_FAILURE_LABELS)
+        private final Map<String, String> failureLabels;
+
+        public ExceptionInfo(String exceptionName, String stacktrace, long 
timestamp) {
+            this(exceptionName, stacktrace, timestamp, Collections.emptyMap(), 
null, null, null);
+        }
+
+        @JsonCreator
+        public ExceptionInfo(
+                @JsonProperty(FIELD_NAME_EXCEPTION_NAME) String exceptionName,
+                @JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE) String 
stacktrace,
+                @JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP) long timestamp,
+                @JsonProperty(FIELD_NAME_FAILURE_LABELS) Map<String, String> 
failureLabels,
+                @JsonProperty(FIELD_NAME_TASK_NAME) @Nullable String taskName,
+                @JsonProperty(FIELD_NAME_ENDPOINT) @Nullable String endpoint,
+                @JsonProperty(FIELD_NAME_TASK_MANAGER_ID) @Nullable String 
taskManagerId) {
+            this.exceptionName = checkNotNull(exceptionName);
+            this.stacktrace = checkNotNull(stacktrace);
+            this.timestamp = timestamp;
+            this.failureLabels = checkNotNull(failureLabels);
+            this.taskName = taskName;
+            this.endpoint = endpoint;
+            this.taskManagerId = taskManagerId;
+        }
+
+        @JsonIgnore
+        public String getExceptionName() {
+            return exceptionName;
+        }
+
+        @JsonIgnore
+        public String getStacktrace() {
+            return stacktrace;
+        }
+
+        @JsonIgnore
+        public long getTimestamp() {
+            return timestamp;
+        }
+
+        @JsonIgnore
+        @Nullable
+        public String getTaskName() {
+            return taskName;
+        }
+
+        @JsonIgnore
+        @Nullable
+        public String getEndpoint() {
+            return endpoint;
+        }
+
+        @JsonIgnore
+        @Nullable
+        public String getTaskManagerId() {
+            return taskManagerId;
+        }
+
+        @JsonIgnore
+        public Map<String, String> getFailureLabels() {
+            return failureLabels;
+        }
+
+        // hashCode and equals are necessary for the test classes deriving from
+        // RestResponseMarshallingTestBase
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            ExceptionInfo that = (ExceptionInfo) o;
+            return exceptionName.equals(that.exceptionName)
+                    && stacktrace.equals(that.stacktrace)
+                    && Objects.equals(timestamp, that.timestamp)
+                    && Objects.equals(failureLabels, that.failureLabels)
+                    && Objects.equals(taskName, that.taskName)
+                    && Objects.equals(endpoint, that.endpoint);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(
+                    exceptionName, stacktrace, timestamp, failureLabels, 
taskName, endpoint);
+        }
+
+        @Override
+        public String toString() {
+            return new StringJoiner(", ", ExceptionInfo.class.getSimpleName() 
+ "[", "]")
+                    .add("exceptionName='" + exceptionName + "'")
+                    .add("stacktrace='" + stacktrace + "'")
+                    .add("timestamp=" + timestamp)
+                    .add("failureLabels=" + failureLabels)
+                    .add("taskName='" + taskName + "'")
+                    .add("endpoint='" + endpoint + "'")
+                    .toString();
+        }
+    }
+
+    /**
+     * Json equivalent of {@link
+     * 
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry}.
+     */
+    public static class RootExceptionInfo extends ExceptionInfo {
+
+        public static final String FIELD_NAME_CONCURRENT_EXCEPTIONS = 
"concurrentExceptions";
+
+        @JsonProperty(FIELD_NAME_CONCURRENT_EXCEPTIONS)
+        private final Collection<ExceptionInfo> concurrentExceptions;
+
+        public RootExceptionInfo(
+                String exceptionName,
+                String stacktrace,
+                long timestamp,
+                Map<String, String> failureLabels,
+                Collection<ExceptionInfo> concurrentExceptions) {
+            this(
+                    exceptionName,
+                    stacktrace,
+                    timestamp,
+                    failureLabels,
+                    null,
+                    null,
+                    null,
+                    concurrentExceptions);
+        }
+
+        @JsonCreator
+        public RootExceptionInfo(
+                @JsonProperty(FIELD_NAME_EXCEPTION_NAME) String exceptionName,
+                @JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE) String 
stacktrace,
+                @JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP) long timestamp,
+                @JsonProperty(FIELD_NAME_FAILURE_LABELS) Map<String, String> 
failureLabels,
+                @JsonProperty(FIELD_NAME_TASK_NAME) @Nullable String taskName,
+                @JsonProperty(FIELD_NAME_ENDPOINT) @Nullable String endpoint,
+                @JsonProperty(FIELD_NAME_TASK_MANAGER_ID) @Nullable String 
taskManagerId,
+                @JsonProperty(FIELD_NAME_CONCURRENT_EXCEPTIONS)
+                        Collection<ExceptionInfo> concurrentExceptions) {
+            super(
+                    exceptionName,
+                    stacktrace,
+                    timestamp,
+                    failureLabels,
+                    taskName,
+                    endpoint,
+                    taskManagerId);
+            this.concurrentExceptions = concurrentExceptions;
+        }
+
+        @JsonIgnore
+        public Collection<ExceptionInfo> getConcurrentExceptions() {
+            return concurrentExceptions;
+        }
+
+        // hashCode and equals are necessary for the test classes deriving from
+        // RestResponseMarshallingTestBase
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass() || !super.equals(o)) {
+                return false;
+            }
+            RootExceptionInfo that = (RootExceptionInfo) o;
+            return 
getConcurrentExceptions().equals(that.getConcurrentExceptions());
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(super.hashCode(), getConcurrentExceptions());
+        }
+
+        @Override
+        public String toString() {
+            return new StringJoiner(", ", 
RootExceptionInfo.class.getSimpleName() + "[", "]")
+                    .add("exceptionName='" + getExceptionName() + "'")
+                    .add("stacktrace='" + getStacktrace() + "'")
+                    .add("timestamp=" + getTimestamp())
+                    .add("taskName='" + getTaskName() + "'")
+                    .add("endpoint='" + getEndpoint() + "'")
+                    .add("concurrentExceptions=" + getConcurrentExceptions())
+                    .toString();
+        }
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 9b7bc2e2..bee769fb 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -749,10 +749,6 @@ public class TestingFlinkService extends 
AbstractFlinkService {
                         stackTrace,
                         timestamp,
                         Map.of("label-key", "label-value"),
-                        "task-name-1",
-                        "location-1",
-                        "endpoint-1",
-                        "tm-id-1",
                         List.of() // concurrentExceptions
                         );
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
index e40ae74e..4f89d04b 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
@@ -43,6 +43,9 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
@@ -167,8 +170,9 @@ public class JobStatusObserverTest extends OperatorTestBase 
{
                 getResourceContext(deployment, operatorConfig);
 
         var jobId = 
JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
+        ctx.getExceptionCacheEntry().setInitialized(true);
         ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
-        ctx.getExceptionCacheEntry().setLastTimestamp(500L);
+        
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(500L));
         flinkService.addExceptionHistory(jobId, "ExceptionOne", "trace1", 
1000L);
 
         // Ensure jobFailedErr is null before the observe call
@@ -210,8 +214,9 @@ public class JobStatusObserverTest extends OperatorTestBase 
{
                 getResourceContext(deployment, operatorConfig);
 
         var jobId = 
JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
+        ctx.getExceptionCacheEntry().setInitialized(true);
         ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
-        ctx.getExceptionCacheEntry().setLastTimestamp(500L);
+        
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(500L));
         flinkService.addExceptionHistory(jobId, "ExceptionOne", "trace1", 
1000L);
 
         // Ensure jobFailedErr is null before the observe call
@@ -247,8 +252,9 @@ public class JobStatusObserverTest extends OperatorTestBase 
{
                 getResourceContext(deployment, operatorConfig); // set a 
non-terminal state
 
         var jobId = 
JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
+        ctx.getExceptionCacheEntry().setInitialized(true);
         ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
-        ctx.getExceptionCacheEntry().setLastTimestamp(500L);
+        
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(500L));
 
         flinkService.submitApplicationCluster(
                 deployment.getSpec().getJob(), 
ctx.getDeployConfig(deployment.getSpec()), false);
@@ -289,8 +295,9 @@ public class JobStatusObserverTest extends OperatorTestBase 
{
         flinkService.submitApplicationCluster(
                 deployment.getSpec().getJob(), 
ctx.getDeployConfig(deployment.getSpec()), false);
         ReconciliationUtils.updateStatusForDeployedSpec(deployment, new 
Configuration());
+        ctx.getExceptionCacheEntry().setInitialized(true);
         ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
-        ctx.getExceptionCacheEntry().setLastTimestamp(3000L);
+        
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(3000L));
 
         long exceptionTime = 4000L;
         String longTrace = "line1\nline2\nline3\nline4";
@@ -323,8 +330,9 @@ public class JobStatusObserverTest extends OperatorTestBase 
{
         jobStatus.setState(JobStatus.RUNNING); // set a non-terminal state
 
         FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx = 
getResourceContext(deployment);
+        ctx.getExceptionCacheEntry().setInitialized(true);
         
ctx.getExceptionCacheEntry().setJobId(deployment.getStatus().getJobStatus().getJobId());
-        ctx.getExceptionCacheEntry().setLastTimestamp(2500L);
+        
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(2500L));
 
         var jobId = 
JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
         flinkService.submitApplicationCluster(
@@ -365,6 +373,51 @@ public class JobStatusObserverTest extends 
OperatorTestBase {
         
assertTrue(events.get(0).getMessage().contains("org.apache.NewException"));
     }
 
+    @Test
+    public void testExceptionEventTriggerInitialization() throws Exception {
+        var deployment = initDeployment();
+        var status = deployment.getStatus();
+        var jobStatus = status.getJobStatus();
+        jobStatus.setState(JobStatus.RUNNING); // set a non-terminal state
+
+        FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx = 
getResourceContext(deployment);
+
+        var now = Instant.now();
+        var jobId = 
JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
+        flinkService.submitApplicationCluster(
+                deployment.getSpec().getJob(), 
ctx.getDeployConfig(deployment.getSpec()), false);
+
+        // Old exception that happened outside of kubernetes event retention 
should be ignored
+        flinkService.addExceptionHistory(
+                jobId,
+                "OldException",
+                "OldException",
+                now.minus(Duration.ofHours(1)).toEpochMilli());
+        flinkService.addExceptionHistory(
+                jobId,
+                "NewException",
+                "NewException",
+                now.minus(Duration.ofMinutes(1)).toEpochMilli());
+
+        // Ensure jobFailedErr is null before the observe call
+        flinkService.setJobFailedErr(null);
+        observer.observe(ctx);
+
+        var events =
+                kubernetesClient
+                        .v1()
+                        .events()
+                        .inNamespace(deployment.getMetadata().getNamespace())
+                        .list()
+                        .getItems();
+        assertEquals(1, events.size());
+        assertTrue(events.get(0).getMessage().contains("NewException"));
+        assertTrue(ctx.getExceptionCacheEntry().isInitialized());
+        assertEquals(
+                
now.minus(Duration.ofMinutes(1)).truncatedTo(ChronoUnit.MILLIS),
+                ctx.getExceptionCacheEntry().getLastTimestamp());
+    }
+
     private static Stream<Arguments> cancellingArgs() {
         var args = new ArrayList<Arguments>();
         for (var status : JobStatus.values()) {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/PodErrorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsApiServerTest.java
similarity index 69%
rename from 
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/PodErrorTest.java
rename to 
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsApiServerTest.java
index d4e57809..02616004 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/PodErrorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsApiServerTest.java
@@ -18,9 +18,11 @@
 package org.apache.flink.kubernetes.operator.utils;
 
 import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
 
 import io.fabric8.kubeapitest.junit.EnableKubeAPIServer;
 import io.fabric8.kubernetes.api.model.EventBuilder;
+import io.fabric8.kubernetes.api.model.HasMetadata;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodBuilder;
 import io.fabric8.kubernetes.api.model.PodCondition;
@@ -28,16 +30,18 @@ import io.fabric8.kubernetes.api.model.PodConditionBuilder;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import org.junit.jupiter.api.Test;
 
+import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Optional;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
 /** Test for {@link EventUtils}. */
 @EnableKubeAPIServer
-public class PodErrorTest {
+public class EventUtilsApiServerTest {
 
     static KubernetesClient client;
 
@@ -145,4 +149,54 @@ public class PodErrorTest {
                         .build();
         client.resource(event).create();
     }
+
+    @Test
+    public void testFindingLatestJobExceptionTs() throws Exception {
+        // Testing with pods instead of FlinkDeployments due to the local Kube 
server limitations
+        var pod =
+                new PodBuilder()
+                        .withNewMetadata()
+                        .withName("test2")
+                        .withNamespace("default")
+                        .endMetadata()
+                        .withNewStatus()
+                        .endStatus()
+                        .build();
+
+        assertEquals(Optional.empty(), 
EventUtils.findLastJobExceptionTsFromK8s(client, pod));
+
+        var now = Instant.now();
+        createExceptionEvent(pod, "ex1", now);
+        assertEquals(Optional.of(now), 
EventUtils.findLastJobExceptionTsFromK8s(client, pod));
+
+        createExceptionEvent(pod, "ex2", now.plus(Duration.ofSeconds(1)));
+        createExceptionEvent(pod, "ex3", now.minus(Duration.ofSeconds(1)));
+
+        assertEquals(
+                Optional.of(now.plus(Duration.ofSeconds(1))),
+                EventUtils.findLastJobExceptionTsFromK8s(client, pod));
+    }
+
+    private static void createExceptionEvent(HasMetadata resource, String 
name, Instant now) {
+        var event =
+                new EventBuilder()
+                        .withApiVersion("v1")
+                        
.withInvolvedObject(EventUtils.getObjectReference(resource))
+                        .withType("type")
+                        .withReason(EventRecorder.Reason.JobException.name())
+                        .withFirstTimestamp(Instant.now().toString())
+                        .withLastTimestamp(Instant.now().toString())
+                        .withNewSource()
+                        .withComponent(EventRecorder.Component.Job.name())
+                        .endSource()
+                        .withCount(1)
+                        .withMessage("m")
+                        .withNewMetadata()
+                        .withName(name)
+                        .withNamespace(resource.getMetadata().getNamespace())
+                        
.addToAnnotations(JobStatusObserver.EXCEPTION_TIMESTAMP, now.toString())
+                        .endMetadata()
+                        .build();
+        client.resource(event).create();
+    }
 }


Reply via email to