This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 3c60c3c4 [FLINK-37730] Improve exception recording ts initialization +
2.0 compatibility
3c60c3c4 is described below
commit 3c60c3c45e33e16f7c1a26a53b634914f6ca863c
Author: Gyula Fora <[email protected]>
AuthorDate: Tue May 27 10:34:31 2025 +0200
[FLINK-37730] Improve exception recording ts initialization + 2.0
compatibility
---
.gitignore | 2 +
.../operator/observer/JobStatusObserver.java | 105 +++---
.../service/FlinkResourceContextFactory.java | 20 +-
.../kubernetes/operator/utils/EventUtils.java | 26 +-
.../messages/JobExceptionsInfoWithHistory.java | 367 +++++++++++++++++++++
.../kubernetes/operator/TestingFlinkService.java | 4 -
.../operator/observer/JobStatusObserverTest.java | 63 +++-
...ErrorTest.java => EventUtilsApiServerTest.java} | 56 +++-
8 files changed, 568 insertions(+), 75 deletions(-)
diff --git a/.gitignore b/.gitignore
index bb85f4bb..2db0ebc1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -36,3 +36,5 @@ buildNumber.properties
.idea
*.iml
*.DS_Store
+
+.kube
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index caf4a32a..0c875613 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -27,6 +27,7 @@ import
org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.EventUtils;
import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
import org.apache.flink.kubernetes.operator.utils.K8sAnnotationsSanitizer;
import org.apache.flink.runtime.client.JobStatusMessage;
@@ -35,12 +36,12 @@ import
org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Duration;
import java.time.Instant;
-import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
@@ -53,6 +54,8 @@ public class JobStatusObserver<R extends
AbstractFlinkResource<?, ?>> {
private static final Logger LOG =
LoggerFactory.getLogger(JobStatusObserver.class);
public static final String JOB_NOT_FOUND_ERR = "Job Not Found";
+ public static final String EXCEPTION_TIMESTAMP = "exception-timestamp";
+ public static final Duration MAX_K8S_EVENT_AGE = Duration.ofMinutes(30);
protected final EventRecorder eventRecorder;
@@ -132,65 +135,77 @@ public class JobStatusObserver<R extends
AbstractFlinkResource<?, ?>> {
}
var exceptionHistory = history.getExceptionHistory();
- List<JobExceptionsInfoWithHistory.RootExceptionInfo> exceptions =
- exceptionHistory.getEntries();
- if (exceptions == null || exceptions.isEmpty()) {
- return;
+ var exceptions = exceptionHistory.getEntries();
+ if (exceptions != null) {
+ exceptions = new ArrayList<>(exceptions);
+ exceptions.sort(
+ Comparator.comparingLong(
+
JobExceptionsInfoWithHistory.RootExceptionInfo
+ ::getTimestamp)
+ .reversed());
+ } else {
+ exceptions = Collections.emptyList();
}
- if (exceptionHistory.isTruncated()) {
- LOG.warn(
- "Job exception history is truncated for jobId '{}'.
Some exceptions may be missing.",
- jobId);
+ String currentJobId = jobStatus.getJobId();
+ var cacheEntry = ctx.getExceptionCacheEntry();
+
+ if (!cacheEntry.isInitialized()) {
+ Instant lastExceptionTs;
+ if (exceptions.isEmpty()) {
+ // If the job doesn't have any exceptions set to MIN as we
always have to record
+ // the next
+ lastExceptionTs = Instant.MIN;
+ } else {
+ var k8sExpirationTs =
Instant.now().minus(MAX_K8S_EVENT_AGE);
+ var maxJobExceptionTs =
Instant.ofEpochMilli(exceptions.get(0).getTimestamp());
+ if (maxJobExceptionTs.isBefore(k8sExpirationTs)) {
+ // If the last job exception was a long time ago, then
there is no point in
+ // checking in k8s. We won't report this as exception
+ lastExceptionTs = maxJobExceptionTs;
+ } else {
+ // If there were recent exceptions, we check the
triggered events from kube
+ // to make sure we don't double trigger
+ lastExceptionTs =
+ EventUtils.findLastJobExceptionTsFromK8s(
+ ctx.getKubernetesClient(),
resource)
+ .orElse(k8sExpirationTs);
+ }
+ }
+
+ cacheEntry.setLastTimestamp(lastExceptionTs);
+ cacheEntry.setInitialized(true);
+ cacheEntry.setJobId(currentJobId);
}
- String currentJobId = jobStatus.getJobId();
- Instant lastRecorded = null; // first reconciliation
+ var lastRecorded =
+ currentJobId.equals(cacheEntry.getJobId())
+ ? cacheEntry.getLastTimestamp()
+ : Instant.MIN;
- var cacheEntry = ctx.getExceptionCacheEntry();
- // a cache entry is created should always be present. The
timestamp for the first
- // reconciliation would be
- // when the job was created. This check is still necessary because
even though there
- // might be an entry,
- // the jobId could have changed since the job was first created.
- if (cacheEntry.getJobId() != null &&
cacheEntry.getJobId().equals(currentJobId)) {
- lastRecorded =
Instant.ofEpochMilli(cacheEntry.getLastTimestamp());
+ if (exceptions.isEmpty()) {
+ return;
}
int maxEvents =
operatorConfig.getReportedExceptionEventsMaxCount();
int maxStackTraceLines =
operatorConfig.getReportedExceptionEventsMaxStackTraceLength();
- // Sort and reverse to prioritize the newest exceptions
- var sortedExceptions = new ArrayList<>(exceptions);
- sortedExceptions.sort(
- Comparator.comparingLong(
-
JobExceptionsInfoWithHistory.RootExceptionInfo::getTimestamp)
- .reversed());
int count = 0;
- Instant latestSeen = null;
-
- for (var exception : sortedExceptions) {
- Instant exceptionTime =
Instant.ofEpochMilli(exception.getTimestamp());
- // Skip already recorded exceptions
- if (lastRecorded != null &&
!exceptionTime.isAfter(lastRecorded)) {
+ for (var exception : exceptions) {
+ var exceptionTime =
Instant.ofEpochMilli(exception.getTimestamp());
+ // Skip already recorded exceptions and after max count
+ if (!exceptionTime.isAfter(lastRecorded) || count++ >=
maxEvents) {
break;
}
emitJobManagerExceptionEvent(ctx, exception, exceptionTime,
maxStackTraceLines);
- if (latestSeen == null) {
- latestSeen = exceptionTime;
- }
- if (++count >= maxEvents) {
- break;
- }
}
- ctx.getExceptionCacheEntry().setJobId(currentJobId);
- // Set to the timestamp of the latest emitted exception, if any
were emitted
- // the other option is that if no exceptions were emitted, we set
this to now.
- if (latestSeen != null) {
-
ctx.getExceptionCacheEntry().setLastTimestamp(latestSeen.toEpochMilli());
+ if (count > maxEvents) {
+ LOG.warn("Job exception history is truncated. Some exceptions
may be missing.");
}
+ cacheEntry.setJobId(currentJobId);
+
cacheEntry.setLastTimestamp(Instant.ofEpochMilli(exceptions.get(0).getTimestamp()));
} catch (Exception e) {
LOG.warn("Failed to fetch JobManager exception info.", e);
}
@@ -203,9 +218,7 @@ public class JobStatusObserver<R extends
AbstractFlinkResource<?, ?>> {
int maxStackTraceLines) {
Map<String, String> annotations = new HashMap<>();
if (exceptionTime != null) {
- annotations.put(
- "exception-timestamp",
-
exceptionTime.atZone(ZoneId.systemDefault()).toOffsetDateTime().toString());
+ annotations.put(EXCEPTION_TIMESTAMP, exceptionTime.toString());
}
if (exception.getTaskName() != null) {
annotations.put("task-name", exception.getTaskName());
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
index 8228ffbf..f2f75a52 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
@@ -43,6 +43,7 @@ import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -57,12 +58,8 @@ public class FlinkResourceContextFactory {
@Data
public static final class ExceptionCacheEntry {
private String jobId;
- private long lastTimestamp;
-
- public ExceptionCacheEntry(String jobId, long lastTimestamp) {
- this.jobId = jobId;
- this.lastTimestamp = lastTimestamp;
- }
+ private Instant lastTimestamp;
+ private boolean initialized;
}
@VisibleForTesting
@@ -128,10 +125,7 @@ public class FlinkResourceContextFactory {
configManager,
this::getFlinkService,
lastRecordedExceptionCache.computeIfAbsent(
- resourceId,
- id ->
- new ExceptionCacheEntry(
- flinkDepJobId,
System.currentTimeMillis())));
+ resourceId, id -> new
ExceptionCacheEntry()));
} else if (resource instanceof FlinkSessionJob) {
var resourceId = ResourceID.fromResource(resource);
var flinkSessionJobId = jobId;
@@ -143,11 +137,7 @@ public class FlinkResourceContextFactory {
configManager,
this::getFlinkService,
lastRecordedExceptionCache.computeIfAbsent(
- resourceId,
- id ->
- new ExceptionCacheEntry(
- flinkSessionJobId,
-
System.currentTimeMillis())));
+ resourceId, id -> new
ExceptionCacheEntry()));
} else {
throw new IllegalArgumentException(
"Unknown resource type " +
resource.getClass().getSimpleName());
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
index df6232d4..faac4829 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.utils;
import org.apache.flink.annotation.VisibleForTesting;
import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.api.model.EventBuilder;
@@ -38,6 +39,7 @@ import java.net.HttpURLConnection;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -283,13 +285,13 @@ public class EventUtils {
return Optional.empty();
}
- private static List<Event> getPodEvents(KubernetesClient client, Pod pod) {
- var ref = getObjectReference(pod);
+ private static List<Event> getResourceEvents(KubernetesClient client,
HasMetadata cr) {
+ var ref = getObjectReference(cr);
var eventList =
client.v1()
.events()
- .inNamespace(pod.getMetadata().getNamespace())
+ .inNamespace(cr.getMetadata().getNamespace())
.withInvolvedObject(ref)
.list();
@@ -343,7 +345,7 @@ public class EventUtils {
boolean notReady = checkStatusWasAlways(pod,
conditionMap.get("Ready"), "False");
if (notReady && failedInitialization) {
- getPodEvents(client, pod).stream()
+ getResourceEvents(client, pod).stream()
.filter(e -> e.getReason().equals("FailedMount"))
.findAny()
.ifPresent(
@@ -356,4 +358,20 @@ public class EventUtils {
private static boolean checkStatusWasAlways(Pod pod, PodCondition
condition, String status) {
return condition != null && condition.getStatus().equals(status);
}
+
+ public static Optional<Instant> findLastJobExceptionTsFromK8s(
+ KubernetesClient client, HasMetadata cr) {
+ var events = getResourceEvents(client, cr);
+ return events.stream()
+ .filter(e ->
EventRecorder.Reason.JobException.name().equals(e.getReason()))
+ .map(
+ e ->
+ Instant.parse(
+ e.getMetadata()
+ .getAnnotations()
+ .getOrDefault(
+
JobStatusObserver.EXCEPTION_TIMESTAMP,
+
e.getMetadata().getCreationTimestamp())))
+ .max(Comparator.naturalOrder());
+ }
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
new file mode 100644
index 00000000..7cc17ded
--- /dev/null
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.StringJoiner;
+
+import static
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Copied from Flink 2.0 to handle removed changes. */
+public class JobExceptionsInfoWithHistory implements ResponseBody {
+
+ public static final String FIELD_NAME_EXCEPTION_HISTORY =
"exceptionHistory";
+
+ @JsonProperty(FIELD_NAME_EXCEPTION_HISTORY)
+ private final JobExceptionHistory exceptionHistory;
+
+ @JsonCreator
+ public JobExceptionsInfoWithHistory(
+ @JsonProperty(FIELD_NAME_EXCEPTION_HISTORY) JobExceptionHistory
exceptionHistory) {
+ this.exceptionHistory = exceptionHistory;
+ }
+
+ @JsonIgnore
+ public JobExceptionHistory getExceptionHistory() {
+ return exceptionHistory;
+ }
+
+ // hashCode and equals are necessary for the test classes deriving from
+ // RestResponseMarshallingTestBase
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ JobExceptionsInfoWithHistory that = (JobExceptionsInfoWithHistory) o;
+ return Objects.equals(exceptionHistory, that.exceptionHistory);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(exceptionHistory);
+ }
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ",
JobExceptionsInfoWithHistory.class.getSimpleName() + "[", "]")
+ .add("exceptionHistory=" + exceptionHistory)
+ .toString();
+ }
+
+ /** {@code JobExceptionHistory} collects all previously caught errors. */
+ public static final class JobExceptionHistory {
+
+ public static final String FIELD_NAME_ENTRIES = "entries";
+ public static final String FIELD_NAME_TRUNCATED = "truncated";
+
+ @JsonProperty(FIELD_NAME_ENTRIES)
+ private final List<RootExceptionInfo> entries;
+
+ @JsonProperty(FIELD_NAME_TRUNCATED)
+ private final boolean truncated;
+
+ @JsonCreator
+ public JobExceptionHistory(
+ @JsonProperty(FIELD_NAME_ENTRIES) List<RootExceptionInfo>
entries,
+ @JsonProperty(FIELD_NAME_TRUNCATED) boolean truncated) {
+ this.entries = entries;
+ this.truncated = truncated;
+ }
+
+ @JsonIgnore
+ public List<RootExceptionInfo> getEntries() {
+ return entries;
+ }
+
+ @JsonIgnore
+ public boolean isTruncated() {
+ return truncated;
+ }
+
+ // hashCode and equals are necessary for the test classes deriving from
+ // RestResponseMarshallingTestBase
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ JobExceptionHistory that = (JobExceptionHistory) o;
+ return this.isTruncated() == that.isTruncated()
+ && Objects.equals(entries, that.entries);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(entries, truncated);
+ }
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ",
JobExceptionHistory.class.getSimpleName() + "[", "]")
+ .add("entries=" + entries)
+ .add("truncated=" + truncated)
+ .toString();
+ }
+ }
+
+ /**
+ * Json equivalent of {@link
+ *
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry}.
+ */
+ public static class ExceptionInfo {
+
+ public static final String FIELD_NAME_EXCEPTION_NAME = "exceptionName";
+ public static final String FIELD_NAME_EXCEPTION_STACKTRACE =
"stacktrace";
+ public static final String FIELD_NAME_EXCEPTION_TIMESTAMP =
"timestamp";
+ public static final String FIELD_NAME_TASK_NAME = "taskName";
+ public static final String FIELD_NAME_ENDPOINT = "endpoint";
+ public static final String FIELD_NAME_TASK_MANAGER_ID =
"taskManagerId";
+ public static final String FIELD_NAME_FAILURE_LABELS = "failureLabels";
+
+ @JsonProperty(FIELD_NAME_EXCEPTION_NAME)
+ private final String exceptionName;
+
+ @JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE)
+ private final String stacktrace;
+
+ @JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP)
+ private final long timestamp;
+
+ @JsonInclude(NON_NULL)
+ @JsonProperty(FIELD_NAME_TASK_NAME)
+ @Nullable
+ private final String taskName;
+
+ @JsonInclude(NON_NULL)
+ @JsonProperty(FIELD_NAME_ENDPOINT)
+ @Nullable
+ private final String endpoint;
+
+ @JsonInclude(NON_NULL)
+ @JsonProperty(FIELD_NAME_TASK_MANAGER_ID)
+ @Nullable
+ private final String taskManagerId;
+
+ @JsonProperty(FIELD_NAME_FAILURE_LABELS)
+ private final Map<String, String> failureLabels;
+
+ public ExceptionInfo(String exceptionName, String stacktrace, long
timestamp) {
+ this(exceptionName, stacktrace, timestamp, Collections.emptyMap(),
null, null, null);
+ }
+
+ @JsonCreator
+ public ExceptionInfo(
+ @JsonProperty(FIELD_NAME_EXCEPTION_NAME) String exceptionName,
+ @JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE) String
stacktrace,
+ @JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP) long timestamp,
+ @JsonProperty(FIELD_NAME_FAILURE_LABELS) Map<String, String>
failureLabels,
+ @JsonProperty(FIELD_NAME_TASK_NAME) @Nullable String taskName,
+ @JsonProperty(FIELD_NAME_ENDPOINT) @Nullable String endpoint,
+ @JsonProperty(FIELD_NAME_TASK_MANAGER_ID) @Nullable String
taskManagerId) {
+ this.exceptionName = checkNotNull(exceptionName);
+ this.stacktrace = checkNotNull(stacktrace);
+ this.timestamp = timestamp;
+ this.failureLabels = checkNotNull(failureLabels);
+ this.taskName = taskName;
+ this.endpoint = endpoint;
+ this.taskManagerId = taskManagerId;
+ }
+
+ @JsonIgnore
+ public String getExceptionName() {
+ return exceptionName;
+ }
+
+ @JsonIgnore
+ public String getStacktrace() {
+ return stacktrace;
+ }
+
+ @JsonIgnore
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ @JsonIgnore
+ @Nullable
+ public String getTaskName() {
+ return taskName;
+ }
+
+ @JsonIgnore
+ @Nullable
+ public String getEndpoint() {
+ return endpoint;
+ }
+
+ @JsonIgnore
+ @Nullable
+ public String getTaskManagerId() {
+ return taskManagerId;
+ }
+
+ @JsonIgnore
+ public Map<String, String> getFailureLabels() {
+ return failureLabels;
+ }
+
+ // hashCode and equals are necessary for the test classes deriving from
+ // RestResponseMarshallingTestBase
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ExceptionInfo that = (ExceptionInfo) o;
+ return exceptionName.equals(that.exceptionName)
+ && stacktrace.equals(that.stacktrace)
+ && Objects.equals(timestamp, that.timestamp)
+ && Objects.equals(failureLabels, that.failureLabels)
+ && Objects.equals(taskName, that.taskName)
+ && Objects.equals(endpoint, that.endpoint);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ exceptionName, stacktrace, timestamp, failureLabels,
taskName, endpoint);
+ }
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ", ExceptionInfo.class.getSimpleName()
+ "[", "]")
+ .add("exceptionName='" + exceptionName + "'")
+ .add("stacktrace='" + stacktrace + "'")
+ .add("timestamp=" + timestamp)
+ .add("failureLabels=" + failureLabels)
+ .add("taskName='" + taskName + "'")
+ .add("endpoint='" + endpoint + "'")
+ .toString();
+ }
+ }
+
+ /**
+ * Json equivalent of {@link
+ *
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry}.
+ */
+ public static class RootExceptionInfo extends ExceptionInfo {
+
+ public static final String FIELD_NAME_CONCURRENT_EXCEPTIONS =
"concurrentExceptions";
+
+ @JsonProperty(FIELD_NAME_CONCURRENT_EXCEPTIONS)
+ private final Collection<ExceptionInfo> concurrentExceptions;
+
+ public RootExceptionInfo(
+ String exceptionName,
+ String stacktrace,
+ long timestamp,
+ Map<String, String> failureLabels,
+ Collection<ExceptionInfo> concurrentExceptions) {
+ this(
+ exceptionName,
+ stacktrace,
+ timestamp,
+ failureLabels,
+ null,
+ null,
+ null,
+ concurrentExceptions);
+ }
+
+ @JsonCreator
+ public RootExceptionInfo(
+ @JsonProperty(FIELD_NAME_EXCEPTION_NAME) String exceptionName,
+ @JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE) String
stacktrace,
+ @JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP) long timestamp,
+ @JsonProperty(FIELD_NAME_FAILURE_LABELS) Map<String, String>
failureLabels,
+ @JsonProperty(FIELD_NAME_TASK_NAME) @Nullable String taskName,
+ @JsonProperty(FIELD_NAME_ENDPOINT) @Nullable String endpoint,
+ @JsonProperty(FIELD_NAME_TASK_MANAGER_ID) @Nullable String
taskManagerId,
+ @JsonProperty(FIELD_NAME_CONCURRENT_EXCEPTIONS)
+ Collection<ExceptionInfo> concurrentExceptions) {
+ super(
+ exceptionName,
+ stacktrace,
+ timestamp,
+ failureLabels,
+ taskName,
+ endpoint,
+ taskManagerId);
+ this.concurrentExceptions = concurrentExceptions;
+ }
+
+ @JsonIgnore
+ public Collection<ExceptionInfo> getConcurrentExceptions() {
+ return concurrentExceptions;
+ }
+
+ // hashCode and equals are necessary for the test classes deriving from
+ // RestResponseMarshallingTestBase
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass() || !super.equals(o)) {
+ return false;
+ }
+ RootExceptionInfo that = (RootExceptionInfo) o;
+ return
getConcurrentExceptions().equals(that.getConcurrentExceptions());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), getConcurrentExceptions());
+ }
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ",
RootExceptionInfo.class.getSimpleName() + "[", "]")
+ .add("exceptionName='" + getExceptionName() + "'")
+ .add("stacktrace='" + getStacktrace() + "'")
+ .add("timestamp=" + getTimestamp())
+ .add("taskName='" + getTaskName() + "'")
+ .add("endpoint='" + getEndpoint() + "'")
+ .add("concurrentExceptions=" + getConcurrentExceptions())
+ .toString();
+ }
+ }
+}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 9b7bc2e2..bee769fb 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -749,10 +749,6 @@ public class TestingFlinkService extends
AbstractFlinkService {
stackTrace,
timestamp,
Map.of("label-key", "label-value"),
- "task-name-1",
- "location-1",
- "endpoint-1",
- "tm-id-1",
List.of() // concurrentExceptions
);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
index e40ae74e..4f89d04b 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
@@ -43,6 +43,9 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
@@ -167,8 +170,9 @@ public class JobStatusObserverTest extends OperatorTestBase
{
getResourceContext(deployment, operatorConfig);
var jobId =
JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
+ ctx.getExceptionCacheEntry().setInitialized(true);
ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
- ctx.getExceptionCacheEntry().setLastTimestamp(500L);
+
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(500L));
flinkService.addExceptionHistory(jobId, "ExceptionOne", "trace1",
1000L);
// Ensure jobFailedErr is null before the observe call
@@ -210,8 +214,9 @@ public class JobStatusObserverTest extends OperatorTestBase
{
getResourceContext(deployment, operatorConfig);
var jobId =
JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
+ ctx.getExceptionCacheEntry().setInitialized(true);
ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
- ctx.getExceptionCacheEntry().setLastTimestamp(500L);
+
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(500L));
flinkService.addExceptionHistory(jobId, "ExceptionOne", "trace1",
1000L);
// Ensure jobFailedErr is null before the observe call
@@ -247,8 +252,9 @@ public class JobStatusObserverTest extends OperatorTestBase
{
getResourceContext(deployment, operatorConfig); // set a
non-terminal state
var jobId =
JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
+ ctx.getExceptionCacheEntry().setInitialized(true);
ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
- ctx.getExceptionCacheEntry().setLastTimestamp(500L);
+
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(500L));
flinkService.submitApplicationCluster(
deployment.getSpec().getJob(),
ctx.getDeployConfig(deployment.getSpec()), false);
@@ -289,8 +295,9 @@ public class JobStatusObserverTest extends OperatorTestBase
{
flinkService.submitApplicationCluster(
deployment.getSpec().getJob(),
ctx.getDeployConfig(deployment.getSpec()), false);
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new
Configuration());
+ ctx.getExceptionCacheEntry().setInitialized(true);
ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
- ctx.getExceptionCacheEntry().setLastTimestamp(3000L);
+
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(3000L));
long exceptionTime = 4000L;
String longTrace = "line1\nline2\nline3\nline4";
@@ -323,8 +330,9 @@ public class JobStatusObserverTest extends OperatorTestBase
{
jobStatus.setState(JobStatus.RUNNING); // set a non-terminal state
FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx =
getResourceContext(deployment);
+ ctx.getExceptionCacheEntry().setInitialized(true);
ctx.getExceptionCacheEntry().setJobId(deployment.getStatus().getJobStatus().getJobId());
- ctx.getExceptionCacheEntry().setLastTimestamp(2500L);
+
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(2500L));
var jobId =
JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
flinkService.submitApplicationCluster(
@@ -365,6 +373,51 @@ public class JobStatusObserverTest extends
OperatorTestBase {
assertTrue(events.get(0).getMessage().contains("org.apache.NewException"));
}
+ @Test
+ public void testExceptionEventTriggerInitialization() throws Exception {
+ var deployment = initDeployment();
+ var status = deployment.getStatus();
+ var jobStatus = status.getJobStatus();
+ jobStatus.setState(JobStatus.RUNNING); // set a non-terminal state
+
+ FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx =
getResourceContext(deployment);
+
+ var now = Instant.now();
+ var jobId =
JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
+ flinkService.submitApplicationCluster(
+ deployment.getSpec().getJob(),
ctx.getDeployConfig(deployment.getSpec()), false);
+
+ // Old exception that happened outside of kubernetes event retention
should be ignored
+ flinkService.addExceptionHistory(
+ jobId,
+ "OldException",
+ "OldException",
+ now.minus(Duration.ofHours(1)).toEpochMilli());
+ flinkService.addExceptionHistory(
+ jobId,
+ "NewException",
+ "NewException",
+ now.minus(Duration.ofMinutes(1)).toEpochMilli());
+
+ // Ensure jobFailedErr is null before the observe call
+ flinkService.setJobFailedErr(null);
+ observer.observe(ctx);
+
+ var events =
+ kubernetesClient
+ .v1()
+ .events()
+ .inNamespace(deployment.getMetadata().getNamespace())
+ .list()
+ .getItems();
+ assertEquals(1, events.size());
+ assertTrue(events.get(0).getMessage().contains("NewException"));
+ assertTrue(ctx.getExceptionCacheEntry().isInitialized());
+ assertEquals(
+
now.minus(Duration.ofMinutes(1)).truncatedTo(ChronoUnit.MILLIS),
+ ctx.getExceptionCacheEntry().getLastTimestamp());
+ }
+
private static Stream<Arguments> cancellingArgs() {
var args = new ArrayList<Arguments>();
for (var status : JobStatus.values()) {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/PodErrorTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsApiServerTest.java
similarity index 69%
rename from
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/PodErrorTest.java
rename to
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsApiServerTest.java
index d4e57809..02616004 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/PodErrorTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsApiServerTest.java
@@ -18,9 +18,11 @@
package org.apache.flink.kubernetes.operator.utils;
import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
import io.fabric8.kubeapitest.junit.EnableKubeAPIServer;
import io.fabric8.kubernetes.api.model.EventBuilder;
+import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.PodCondition;
@@ -28,16 +30,18 @@ import io.fabric8.kubernetes.api.model.PodConditionBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.junit.jupiter.api.Test;
+import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
/** Test for {@link EventUtils}. */
@EnableKubeAPIServer
-public class PodErrorTest {
+public class EventUtilsApiServerTest {
static KubernetesClient client;
@@ -145,4 +149,54 @@ public class PodErrorTest {
.build();
client.resource(event).create();
}
+
+ @Test
+ public void testFindingLatestJobExceptionTs() throws Exception {
+ // Testing with pods instead of FlinkDeployments due to the local Kube
server limitations
+ var pod =
+ new PodBuilder()
+ .withNewMetadata()
+ .withName("test2")
+ .withNamespace("default")
+ .endMetadata()
+ .withNewStatus()
+ .endStatus()
+ .build();
+
+ assertEquals(Optional.empty(),
EventUtils.findLastJobExceptionTsFromK8s(client, pod));
+
+ var now = Instant.now();
+ createExceptionEvent(pod, "ex1", now);
+ assertEquals(Optional.of(now),
EventUtils.findLastJobExceptionTsFromK8s(client, pod));
+
+ createExceptionEvent(pod, "ex2", now.plus(Duration.ofSeconds(1)));
+ createExceptionEvent(pod, "ex3", now.minus(Duration.ofSeconds(1)));
+
+ assertEquals(
+ Optional.of(now.plus(Duration.ofSeconds(1))),
+ EventUtils.findLastJobExceptionTsFromK8s(client, pod));
+ }
+
+ private static void createExceptionEvent(HasMetadata resource, String
name, Instant now) {
+ var event =
+ new EventBuilder()
+ .withApiVersion("v1")
+
.withInvolvedObject(EventUtils.getObjectReference(resource))
+ .withType("type")
+ .withReason(EventRecorder.Reason.JobException.name())
+ .withFirstTimestamp(Instant.now().toString())
+ .withLastTimestamp(Instant.now().toString())
+ .withNewSource()
+ .withComponent(EventRecorder.Component.Job.name())
+ .endSource()
+ .withCount(1)
+ .withMessage("m")
+ .withNewMetadata()
+ .withName(name)
+ .withNamespace(resource.getMetadata().getNamespace())
+
.addToAnnotations(JobStatusObserver.EXCEPTION_TIMESTAMP, now.toString())
+ .endMetadata()
+ .build();
+ client.resource(event).create();
+ }
}