This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 3c60c3c4 [FLINK-37730] Improve exception recording ts initialization + 2.0 compatibility 3c60c3c4 is described below commit 3c60c3c45e33e16f7c1a26a53b634914f6ca863c Author: Gyula Fora <g_f...@apple.com> AuthorDate: Tue May 27 10:34:31 2025 +0200 [FLINK-37730] Improve exception recording ts initialization + 2.0 compatibility --- .gitignore | 2 + .../operator/observer/JobStatusObserver.java | 105 +++--- .../service/FlinkResourceContextFactory.java | 20 +- .../kubernetes/operator/utils/EventUtils.java | 26 +- .../messages/JobExceptionsInfoWithHistory.java | 367 +++++++++++++++++++++ .../kubernetes/operator/TestingFlinkService.java | 4 - .../operator/observer/JobStatusObserverTest.java | 63 +++- ...ErrorTest.java => EventUtilsApiServerTest.java} | 56 +++- 8 files changed, 568 insertions(+), 75 deletions(-) diff --git a/.gitignore b/.gitignore index bb85f4bb..2db0ebc1 100644 --- a/.gitignore +++ b/.gitignore @@ -36,3 +36,5 @@ buildNumber.properties .idea *.iml *.DS_Store + +.kube diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java index caf4a32a..0c875613 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java @@ -27,6 +27,7 @@ import org.apache.flink.kubernetes.operator.api.status.ReconciliationState; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.utils.EventRecorder; +import org.apache.flink.kubernetes.operator.utils.EventUtils; import org.apache.flink.kubernetes.operator.utils.ExceptionUtils; import org.apache.flink.kubernetes.operator.utils.K8sAnnotationsSanitizer; import org.apache.flink.runtime.client.JobStatusMessage; @@ -35,12 +36,12 @@ import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.time.Instant; -import java.time.ZoneId; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeoutException; @@ -53,6 +54,8 @@ public class JobStatusObserver<R extends AbstractFlinkResource<?, ?>> { private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class); public static final String JOB_NOT_FOUND_ERR = "Job Not Found"; + public static final String EXCEPTION_TIMESTAMP = "exception-timestamp"; + public static final Duration MAX_K8S_EVENT_AGE = Duration.ofMinutes(30); protected final EventRecorder eventRecorder; @@ -132,65 +135,77 @@ public class JobStatusObserver<R extends AbstractFlinkResource<?, ?>> { } var exceptionHistory = history.getExceptionHistory(); - List<JobExceptionsInfoWithHistory.RootExceptionInfo> exceptions = - exceptionHistory.getEntries(); - if (exceptions == null || exceptions.isEmpty()) { - return; + var exceptions = exceptionHistory.getEntries(); + if (exceptions != null) { + exceptions = new ArrayList<>(exceptions); + exceptions.sort( + Comparator.comparingLong( + JobExceptionsInfoWithHistory.RootExceptionInfo + ::getTimestamp) + .reversed()); + } else { + exceptions = Collections.emptyList(); } - if (exceptionHistory.isTruncated()) { - LOG.warn( - "Job exception history is truncated for jobId '{}'. Some exceptions may be missing.", - jobId); + String currentJobId = jobStatus.getJobId(); + var cacheEntry = ctx.getExceptionCacheEntry(); + + if (!cacheEntry.isInitialized()) { + Instant lastExceptionTs; + if (exceptions.isEmpty()) { + // If the job doesn't have any exceptions set to MIN as we always have to record + // the next + lastExceptionTs = Instant.MIN; + } else { + var k8sExpirationTs = Instant.now().minus(MAX_K8S_EVENT_AGE); + var maxJobExceptionTs = Instant.ofEpochMilli(exceptions.get(0).getTimestamp()); + if (maxJobExceptionTs.isBefore(k8sExpirationTs)) { + // If the last job exception was a long time ago, then there is no point in + // checking in k8s. We won't report this as exception + lastExceptionTs = maxJobExceptionTs; + } else { + // If there were recent exceptions, we check the triggered events from kube + // to make sure we don't double trigger + lastExceptionTs = + EventUtils.findLastJobExceptionTsFromK8s( + ctx.getKubernetesClient(), resource) + .orElse(k8sExpirationTs); + } + } + + cacheEntry.setLastTimestamp(lastExceptionTs); + cacheEntry.setInitialized(true); + cacheEntry.setJobId(currentJobId); } - String currentJobId = jobStatus.getJobId(); - Instant lastRecorded = null; // first reconciliation + var lastRecorded = + currentJobId.equals(cacheEntry.getJobId()) + ? cacheEntry.getLastTimestamp() + : Instant.MIN; - var cacheEntry = ctx.getExceptionCacheEntry(); - // a cache entry is created should always be present. The timestamp for the first - // reconciliation would be - // when the job was created. This check is still necessary because even though there - // might be an entry, - // the jobId could have changed since the job was first created. - if (cacheEntry.getJobId() != null && cacheEntry.getJobId().equals(currentJobId)) { - lastRecorded = Instant.ofEpochMilli(cacheEntry.getLastTimestamp()); + if (exceptions.isEmpty()) { + return; } int maxEvents = operatorConfig.getReportedExceptionEventsMaxCount(); int maxStackTraceLines = operatorConfig.getReportedExceptionEventsMaxStackTraceLength(); - // Sort and reverse to prioritize the newest exceptions - var sortedExceptions = new ArrayList<>(exceptions); - sortedExceptions.sort( - Comparator.comparingLong( - JobExceptionsInfoWithHistory.RootExceptionInfo::getTimestamp) - .reversed()); int count = 0; - Instant latestSeen = null; - - for (var exception : sortedExceptions) { - Instant exceptionTime = Instant.ofEpochMilli(exception.getTimestamp()); - // Skip already recorded exceptions - if (lastRecorded != null && !exceptionTime.isAfter(lastRecorded)) { + for (var exception : exceptions) { + var exceptionTime = Instant.ofEpochMilli(exception.getTimestamp()); + // Skip already recorded exceptions and after max count + if (!exceptionTime.isAfter(lastRecorded) || count++ >= maxEvents) { break; } emitJobManagerExceptionEvent(ctx, exception, exceptionTime, maxStackTraceLines); - if (latestSeen == null) { - latestSeen = exceptionTime; - } - if (++count >= maxEvents) { - break; - } } - ctx.getExceptionCacheEntry().setJobId(currentJobId); - // Set to the timestamp of the latest emitted exception, if any were emitted - // the other option is that if no exceptions were emitted, we set this to now. - if (latestSeen != null) { - ctx.getExceptionCacheEntry().setLastTimestamp(latestSeen.toEpochMilli()); + if (count > maxEvents) { + LOG.warn("Job exception history is truncated. Some exceptions may be missing."); } + cacheEntry.setJobId(currentJobId); + cacheEntry.setLastTimestamp(Instant.ofEpochMilli(exceptions.get(0).getTimestamp())); } catch (Exception e) { LOG.warn("Failed to fetch JobManager exception info.", e); } @@ -203,9 +218,7 @@ public class JobStatusObserver<R extends AbstractFlinkResource<?, ?>> { int maxStackTraceLines) { Map<String, String> annotations = new HashMap<>(); if (exceptionTime != null) { - annotations.put( - "exception-timestamp", - exceptionTime.atZone(ZoneId.systemDefault()).toOffsetDateTime().toString()); + annotations.put(EXCEPTION_TIMESTAMP, exceptionTime.toString()); } if (exception.getTaskName() != null) { annotations.put("task-name", exception.getTaskName()); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java index 8228ffbf..f2f75a52 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java @@ -43,6 +43,7 @@ import lombok.Data; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Instant; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -57,12 +58,8 @@ public class FlinkResourceContextFactory { @Data public static final class ExceptionCacheEntry { private String jobId; - private long lastTimestamp; - - public ExceptionCacheEntry(String jobId, long lastTimestamp) { - this.jobId = jobId; - this.lastTimestamp = lastTimestamp; - } + private Instant lastTimestamp; + private boolean initialized; } @VisibleForTesting @@ -128,10 +125,7 @@ public class FlinkResourceContextFactory { configManager, this::getFlinkService, lastRecordedExceptionCache.computeIfAbsent( - resourceId, - id -> - new ExceptionCacheEntry( - flinkDepJobId, System.currentTimeMillis()))); + resourceId, id -> new ExceptionCacheEntry())); } else if (resource instanceof FlinkSessionJob) { var resourceId = ResourceID.fromResource(resource); var flinkSessionJobId = jobId; @@ -143,11 +137,7 @@ public class FlinkResourceContextFactory { configManager, this::getFlinkService, lastRecordedExceptionCache.computeIfAbsent( - resourceId, - id -> - new ExceptionCacheEntry( - flinkSessionJobId, - System.currentTimeMillis()))); + resourceId, id -> new ExceptionCacheEntry())); } else { throw new IllegalArgumentException( "Unknown resource type " + resource.getClass().getSimpleName()); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java index df6232d4..faac4829 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java @@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.utils; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException; +import org.apache.flink.kubernetes.operator.observer.JobStatusObserver; import io.fabric8.kubernetes.api.model.Event; import io.fabric8.kubernetes.api.model.EventBuilder; @@ -38,6 +39,7 @@ import java.net.HttpURLConnection; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -283,13 +285,13 @@ public class EventUtils { return Optional.empty(); } - private static List<Event> getPodEvents(KubernetesClient client, Pod pod) { - var ref = getObjectReference(pod); + private static List<Event> getResourceEvents(KubernetesClient client, HasMetadata cr) { + var ref = getObjectReference(cr); var eventList = client.v1() .events() - .inNamespace(pod.getMetadata().getNamespace()) + .inNamespace(cr.getMetadata().getNamespace()) .withInvolvedObject(ref) .list(); @@ -343,7 +345,7 @@ public class EventUtils { boolean notReady = checkStatusWasAlways(pod, conditionMap.get("Ready"), "False"); if (notReady && failedInitialization) { - getPodEvents(client, pod).stream() + getResourceEvents(client, pod).stream() .filter(e -> e.getReason().equals("FailedMount")) .findAny() .ifPresent( @@ -356,4 +358,20 @@ public class EventUtils { private static boolean checkStatusWasAlways(Pod pod, PodCondition condition, String status) { return condition != null && condition.getStatus().equals(status); } + + public static Optional<Instant> findLastJobExceptionTsFromK8s( + KubernetesClient client, HasMetadata cr) { + var events = getResourceEvents(client, cr); + return events.stream() + .filter(e -> EventRecorder.Reason.JobException.name().equals(e.getReason())) + .map( + e -> + Instant.parse( + e.getMetadata() + .getAnnotations() + .getOrDefault( + JobStatusObserver.EXCEPTION_TIMESTAMP, + e.getMetadata().getCreationTimestamp()))) + .max(Comparator.naturalOrder()); + } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java new file mode 100644 index 00000000..7cc17ded --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.StringJoiner; + +import static org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Copied from Flink 2.0 to handle removed changes. */ +public class JobExceptionsInfoWithHistory implements ResponseBody { + + public static final String FIELD_NAME_EXCEPTION_HISTORY = "exceptionHistory"; + + @JsonProperty(FIELD_NAME_EXCEPTION_HISTORY) + private final JobExceptionHistory exceptionHistory; + + @JsonCreator + public JobExceptionsInfoWithHistory( + @JsonProperty(FIELD_NAME_EXCEPTION_HISTORY) JobExceptionHistory exceptionHistory) { + this.exceptionHistory = exceptionHistory; + } + + @JsonIgnore + public JobExceptionHistory getExceptionHistory() { + return exceptionHistory; + } + + // hashCode and equals are necessary for the test classes deriving from + // RestResponseMarshallingTestBase + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JobExceptionsInfoWithHistory that = (JobExceptionsInfoWithHistory) o; + return Objects.equals(exceptionHistory, that.exceptionHistory); + } + + @Override + public int hashCode() { + return Objects.hash(exceptionHistory); + } + + @Override + public String toString() { + return new StringJoiner(", ", JobExceptionsInfoWithHistory.class.getSimpleName() + "[", "]") + .add("exceptionHistory=" + exceptionHistory) + .toString(); + } + + /** {@code JobExceptionHistory} collects all previously caught errors. */ + public static final class JobExceptionHistory { + + public static final String FIELD_NAME_ENTRIES = "entries"; + public static final String FIELD_NAME_TRUNCATED = "truncated"; + + @JsonProperty(FIELD_NAME_ENTRIES) + private final List<RootExceptionInfo> entries; + + @JsonProperty(FIELD_NAME_TRUNCATED) + private final boolean truncated; + + @JsonCreator + public JobExceptionHistory( + @JsonProperty(FIELD_NAME_ENTRIES) List<RootExceptionInfo> entries, + @JsonProperty(FIELD_NAME_TRUNCATED) boolean truncated) { + this.entries = entries; + this.truncated = truncated; + } + + @JsonIgnore + public List<RootExceptionInfo> getEntries() { + return entries; + } + + @JsonIgnore + public boolean isTruncated() { + return truncated; + } + + // hashCode and equals are necessary for the test classes deriving from + // RestResponseMarshallingTestBase + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JobExceptionHistory that = (JobExceptionHistory) o; + return this.isTruncated() == that.isTruncated() + && Objects.equals(entries, that.entries); + } + + @Override + public int hashCode() { + return Objects.hash(entries, truncated); + } + + @Override + public String toString() { + return new StringJoiner(", ", JobExceptionHistory.class.getSimpleName() + "[", "]") + .add("entries=" + entries) + .add("truncated=" + truncated) + .toString(); + } + } + + /** + * Json equivalent of {@link + * org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry}. + */ + public static class ExceptionInfo { + + public static final String FIELD_NAME_EXCEPTION_NAME = "exceptionName"; + public static final String FIELD_NAME_EXCEPTION_STACKTRACE = "stacktrace"; + public static final String FIELD_NAME_EXCEPTION_TIMESTAMP = "timestamp"; + public static final String FIELD_NAME_TASK_NAME = "taskName"; + public static final String FIELD_NAME_ENDPOINT = "endpoint"; + public static final String FIELD_NAME_TASK_MANAGER_ID = "taskManagerId"; + public static final String FIELD_NAME_FAILURE_LABELS = "failureLabels"; + + @JsonProperty(FIELD_NAME_EXCEPTION_NAME) + private final String exceptionName; + + @JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE) + private final String stacktrace; + + @JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP) + private final long timestamp; + + @JsonInclude(NON_NULL) + @JsonProperty(FIELD_NAME_TASK_NAME) + @Nullable + private final String taskName; + + @JsonInclude(NON_NULL) + @JsonProperty(FIELD_NAME_ENDPOINT) + @Nullable + private final String endpoint; + + @JsonInclude(NON_NULL) + @JsonProperty(FIELD_NAME_TASK_MANAGER_ID) + @Nullable + private final String taskManagerId; + + @JsonProperty(FIELD_NAME_FAILURE_LABELS) + private final Map<String, String> failureLabels; + + public ExceptionInfo(String exceptionName, String stacktrace, long timestamp) { + this(exceptionName, stacktrace, timestamp, Collections.emptyMap(), null, null, null); + } + + @JsonCreator + public ExceptionInfo( + @JsonProperty(FIELD_NAME_EXCEPTION_NAME) String exceptionName, + @JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE) String stacktrace, + @JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP) long timestamp, + @JsonProperty(FIELD_NAME_FAILURE_LABELS) Map<String, String> failureLabels, + @JsonProperty(FIELD_NAME_TASK_NAME) @Nullable String taskName, + @JsonProperty(FIELD_NAME_ENDPOINT) @Nullable String endpoint, + @JsonProperty(FIELD_NAME_TASK_MANAGER_ID) @Nullable String taskManagerId) { + this.exceptionName = checkNotNull(exceptionName); + this.stacktrace = checkNotNull(stacktrace); + this.timestamp = timestamp; + this.failureLabels = checkNotNull(failureLabels); + this.taskName = taskName; + this.endpoint = endpoint; + this.taskManagerId = taskManagerId; + } + + @JsonIgnore + public String getExceptionName() { + return exceptionName; + } + + @JsonIgnore + public String getStacktrace() { + return stacktrace; + } + + @JsonIgnore + public long getTimestamp() { + return timestamp; + } + + @JsonIgnore + @Nullable + public String getTaskName() { + return taskName; + } + + @JsonIgnore + @Nullable + public String getEndpoint() { + return endpoint; + } + + @JsonIgnore + @Nullable + public String getTaskManagerId() { + return taskManagerId; + } + + @JsonIgnore + public Map<String, String> getFailureLabels() { + return failureLabels; + } + + // hashCode and equals are necessary for the test classes deriving from + // RestResponseMarshallingTestBase + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ExceptionInfo that = (ExceptionInfo) o; + return exceptionName.equals(that.exceptionName) + && stacktrace.equals(that.stacktrace) + && Objects.equals(timestamp, that.timestamp) + && Objects.equals(failureLabels, that.failureLabels) + && Objects.equals(taskName, that.taskName) + && Objects.equals(endpoint, that.endpoint); + } + + @Override + public int hashCode() { + return Objects.hash( + exceptionName, stacktrace, timestamp, failureLabels, taskName, endpoint); + } + + @Override + public String toString() { + return new StringJoiner(", ", ExceptionInfo.class.getSimpleName() + "[", "]") + .add("exceptionName='" + exceptionName + "'") + .add("stacktrace='" + stacktrace + "'") + .add("timestamp=" + timestamp) + .add("failureLabels=" + failureLabels) + .add("taskName='" + taskName + "'") + .add("endpoint='" + endpoint + "'") + .toString(); + } + } + + /** + * Json equivalent of {@link + * org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry}. + */ + public static class RootExceptionInfo extends ExceptionInfo { + + public static final String FIELD_NAME_CONCURRENT_EXCEPTIONS = "concurrentExceptions"; + + @JsonProperty(FIELD_NAME_CONCURRENT_EXCEPTIONS) + private final Collection<ExceptionInfo> concurrentExceptions; + + public RootExceptionInfo( + String exceptionName, + String stacktrace, + long timestamp, + Map<String, String> failureLabels, + Collection<ExceptionInfo> concurrentExceptions) { + this( + exceptionName, + stacktrace, + timestamp, + failureLabels, + null, + null, + null, + concurrentExceptions); + } + + @JsonCreator + public RootExceptionInfo( + @JsonProperty(FIELD_NAME_EXCEPTION_NAME) String exceptionName, + @JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE) String stacktrace, + @JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP) long timestamp, + @JsonProperty(FIELD_NAME_FAILURE_LABELS) Map<String, String> failureLabels, + @JsonProperty(FIELD_NAME_TASK_NAME) @Nullable String taskName, + @JsonProperty(FIELD_NAME_ENDPOINT) @Nullable String endpoint, + @JsonProperty(FIELD_NAME_TASK_MANAGER_ID) @Nullable String taskManagerId, + @JsonProperty(FIELD_NAME_CONCURRENT_EXCEPTIONS) + Collection<ExceptionInfo> concurrentExceptions) { + super( + exceptionName, + stacktrace, + timestamp, + failureLabels, + taskName, + endpoint, + taskManagerId); + this.concurrentExceptions = concurrentExceptions; + } + + @JsonIgnore + public Collection<ExceptionInfo> getConcurrentExceptions() { + return concurrentExceptions; + } + + // hashCode and equals are necessary for the test classes deriving from + // RestResponseMarshallingTestBase + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass() || !super.equals(o)) { + return false; + } + RootExceptionInfo that = (RootExceptionInfo) o; + return getConcurrentExceptions().equals(that.getConcurrentExceptions()); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), getConcurrentExceptions()); + } + + @Override + public String toString() { + return new StringJoiner(", ", RootExceptionInfo.class.getSimpleName() + "[", "]") + .add("exceptionName='" + getExceptionName() + "'") + .add("stacktrace='" + getStacktrace() + "'") + .add("timestamp=" + getTimestamp()) + .add("taskName='" + getTaskName() + "'") + .add("endpoint='" + getEndpoint() + "'") + .add("concurrentExceptions=" + getConcurrentExceptions()) + .toString(); + } + } +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index 9b7bc2e2..bee769fb 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -749,10 +749,6 @@ public class TestingFlinkService extends AbstractFlinkService { stackTrace, timestamp, Map.of("label-key", "label-value"), - "task-name-1", - "location-1", - "endpoint-1", - "tm-id-1", List.of() // concurrentExceptions ); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java index e40ae74e..4f89d04b 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java @@ -43,6 +43,9 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; @@ -167,8 +170,9 @@ public class JobStatusObserverTest extends OperatorTestBase { getResourceContext(deployment, operatorConfig); var jobId = JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId()); + ctx.getExceptionCacheEntry().setInitialized(true); ctx.getExceptionCacheEntry().setJobId(jobId.toHexString()); - ctx.getExceptionCacheEntry().setLastTimestamp(500L); + ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(500L)); flinkService.addExceptionHistory(jobId, "ExceptionOne", "trace1", 1000L); // Ensure jobFailedErr is null before the observe call @@ -210,8 +214,9 @@ public class JobStatusObserverTest extends OperatorTestBase { getResourceContext(deployment, operatorConfig); var jobId = JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId()); + ctx.getExceptionCacheEntry().setInitialized(true); ctx.getExceptionCacheEntry().setJobId(jobId.toHexString()); - ctx.getExceptionCacheEntry().setLastTimestamp(500L); + ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(500L)); flinkService.addExceptionHistory(jobId, "ExceptionOne", "trace1", 1000L); // Ensure jobFailedErr is null before the observe call @@ -247,8 +252,9 @@ public class JobStatusObserverTest extends OperatorTestBase { getResourceContext(deployment, operatorConfig); // set a non-terminal state var jobId = JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId()); + ctx.getExceptionCacheEntry().setInitialized(true); ctx.getExceptionCacheEntry().setJobId(jobId.toHexString()); - ctx.getExceptionCacheEntry().setLastTimestamp(500L); + ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(500L)); flinkService.submitApplicationCluster( deployment.getSpec().getJob(), ctx.getDeployConfig(deployment.getSpec()), false); @@ -289,8 +295,9 @@ public class JobStatusObserverTest extends OperatorTestBase { flinkService.submitApplicationCluster( deployment.getSpec().getJob(), ctx.getDeployConfig(deployment.getSpec()), false); ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration()); + ctx.getExceptionCacheEntry().setInitialized(true); ctx.getExceptionCacheEntry().setJobId(jobId.toHexString()); - ctx.getExceptionCacheEntry().setLastTimestamp(3000L); + ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(3000L)); long exceptionTime = 4000L; String longTrace = "line1\nline2\nline3\nline4"; @@ -323,8 +330,9 @@ public class JobStatusObserverTest extends OperatorTestBase { jobStatus.setState(JobStatus.RUNNING); // set a non-terminal state FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx = getResourceContext(deployment); + ctx.getExceptionCacheEntry().setInitialized(true); ctx.getExceptionCacheEntry().setJobId(deployment.getStatus().getJobStatus().getJobId()); - ctx.getExceptionCacheEntry().setLastTimestamp(2500L); + ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(2500L)); var jobId = JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId()); flinkService.submitApplicationCluster( @@ -365,6 +373,51 @@ public class JobStatusObserverTest extends OperatorTestBase { assertTrue(events.get(0).getMessage().contains("org.apache.NewException")); } + @Test + public void testExceptionEventTriggerInitialization() throws Exception { + var deployment = initDeployment(); + var status = deployment.getStatus(); + var jobStatus = status.getJobStatus(); + jobStatus.setState(JobStatus.RUNNING); // set a non-terminal state + + FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx = getResourceContext(deployment); + + var now = Instant.now(); + var jobId = JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId()); + flinkService.submitApplicationCluster( + deployment.getSpec().getJob(), ctx.getDeployConfig(deployment.getSpec()), false); + + // Old exception that happened outside of kubernetes event retention should be ignored + flinkService.addExceptionHistory( + jobId, + "OldException", + "OldException", + now.minus(Duration.ofHours(1)).toEpochMilli()); + flinkService.addExceptionHistory( + jobId, + "NewException", + "NewException", + now.minus(Duration.ofMinutes(1)).toEpochMilli()); + + // Ensure jobFailedErr is null before the observe call + flinkService.setJobFailedErr(null); + observer.observe(ctx); + + var events = + kubernetesClient + .v1() + .events() + .inNamespace(deployment.getMetadata().getNamespace()) + .list() + .getItems(); + assertEquals(1, events.size()); + assertTrue(events.get(0).getMessage().contains("NewException")); + assertTrue(ctx.getExceptionCacheEntry().isInitialized()); + assertEquals( + now.minus(Duration.ofMinutes(1)).truncatedTo(ChronoUnit.MILLIS), + ctx.getExceptionCacheEntry().getLastTimestamp()); + } + private static Stream<Arguments> cancellingArgs() { var args = new ArrayList<Arguments>(); for (var status : JobStatus.values()) { diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/PodErrorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsApiServerTest.java similarity index 69% rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/PodErrorTest.java rename to flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsApiServerTest.java index d4e57809..02616004 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/PodErrorTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsApiServerTest.java @@ -18,9 +18,11 @@ package org.apache.flink.kubernetes.operator.utils; import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException; +import org.apache.flink.kubernetes.operator.observer.JobStatusObserver; import io.fabric8.kubeapitest.junit.EnableKubeAPIServer; import io.fabric8.kubernetes.api.model.EventBuilder; +import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.PodCondition; @@ -28,16 +30,18 @@ import io.fabric8.kubernetes.api.model.PodConditionBuilder; import io.fabric8.kubernetes.client.KubernetesClient; import org.junit.jupiter.api.Test; +import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; +import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; /** Test for {@link EventUtils}. */ @EnableKubeAPIServer -public class PodErrorTest { +public class EventUtilsApiServerTest { static KubernetesClient client; @@ -145,4 +149,54 @@ public class PodErrorTest { .build(); client.resource(event).create(); } + + @Test + public void testFindingLatestJobExceptionTs() throws Exception { + // Testing with pods instead of FlinkDeployments due to the local Kube server limitations + var pod = + new PodBuilder() + .withNewMetadata() + .withName("test2") + .withNamespace("default") + .endMetadata() + .withNewStatus() + .endStatus() + .build(); + + assertEquals(Optional.empty(), EventUtils.findLastJobExceptionTsFromK8s(client, pod)); + + var now = Instant.now(); + createExceptionEvent(pod, "ex1", now); + assertEquals(Optional.of(now), EventUtils.findLastJobExceptionTsFromK8s(client, pod)); + + createExceptionEvent(pod, "ex2", now.plus(Duration.ofSeconds(1))); + createExceptionEvent(pod, "ex3", now.minus(Duration.ofSeconds(1))); + + assertEquals( + Optional.of(now.plus(Duration.ofSeconds(1))), + EventUtils.findLastJobExceptionTsFromK8s(client, pod)); + } + + private static void createExceptionEvent(HasMetadata resource, String name, Instant now) { + var event = + new EventBuilder() + .withApiVersion("v1") + .withInvolvedObject(EventUtils.getObjectReference(resource)) + .withType("type") + .withReason(EventRecorder.Reason.JobException.name()) + .withFirstTimestamp(Instant.now().toString()) + .withLastTimestamp(Instant.now().toString()) + .withNewSource() + .withComponent(EventRecorder.Component.Job.name()) + .endSource() + .withCount(1) + .withMessage("m") + .withNewMetadata() + .withName(name) + .withNamespace(resource.getMetadata().getNamespace()) + .addToAnnotations(JobStatusObserver.EXCEPTION_TIMESTAMP, now.toString()) + .endMetadata() + .build(); + client.resource(event).create(); + } }