XComp commented on a change in pull request #15049:
URL: https://github.com/apache/flink/pull/15049#discussion_r592482131
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
##########
@@ -37,68 +36,258 @@
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
import org.apache.flink.runtime.rest.messages.JobExceptionsInfo;
+import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import
org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters;
import org.apache.flink.runtime.rest.messages.job.UpperLimitExceptionParameter;
+import org.apache.flink.runtime.scheduler.ExceptionHistoryEntry;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.BoundedFIFOQueue;
import org.apache.flink.runtime.util.EvictingBoundedList;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Test;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import static
org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.taskManagerLocationToString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.collection.IsEmptyCollection.empty;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
/** Test for the {@link JobExceptionsHandler}. */
public class JobExceptionsHandlerTest extends TestLogger {
+ private final JobExceptionsHandler testInstance =
+ new JobExceptionsHandler(
+ CompletableFuture::new,
+ TestingUtils.TIMEOUT(),
+ Collections.emptyMap(),
+ JobExceptionsHeaders.getInstance(),
+ new DefaultExecutionGraphCache(TestingUtils.TIMEOUT(),
TestingUtils.TIMEOUT()),
+ TestingUtils.defaultExecutor());
+
+ @Test
+ public void testNoExceptions() throws HandlerRequestException {
+ final ExecutionGraphInfo executionGraphInfo =
+ new ExecutionGraphInfo(new
ArchivedExecutionGraphBuilder().build());
+
+ final HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters>
request =
+ createRequest(executionGraphInfo.getJobId(), 10);
+ final JobExceptionsInfoWithHistory response =
+ testInstance.handleRequest(request, executionGraphInfo);
+
+ assertThat(response.getRootException(), is(nullValue()));
+ assertThat(response.getRootTimestamp(), is(nullValue()));
+ assertFalse(response.isTruncated());
+ assertThat(response.getAllExceptions(), empty());
+ assertThat(response, hasHistoryEntryCount(0));
+ }
+
+ @Test
+ public void testOnlyRootCause() throws HandlerRequestException {
+ final Throwable rootCause = new RuntimeException("root cause");
+ final long rootCauseTimestamp = System.currentTimeMillis();
+
+ final ExecutionGraphInfo executionGraphInfo =
+ createExecutionGraphInfo(
+ ExceptionHistoryEntry.fromGlobalFailure(rootCause,
rootCauseTimestamp));
+ final HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters>
request =
+ createRequest(executionGraphInfo.getJobId(), 10);
+ final JobExceptionsInfoWithHistory response =
+ testInstance.handleRequest(request, executionGraphInfo);
+
+ assertThat(response, containsRootCause(rootCause, rootCauseTimestamp));
+ assertFalse(response.isTruncated());
+ assertThat(response.getAllExceptions(), empty());
+
+ assertThat(response, hasHistoryEntryCount(1));
+ assertThat(response, containsGlobalFailure(rootCause,
rootCauseTimestamp));
+ }
+
+ @Test
+ public void testWithExceptionHistory() throws HandlerRequestException {
+ final ExceptionHistoryEntry rootCause =
+ ExceptionHistoryEntry.fromGlobalFailure(
+ new RuntimeException("exception #0"),
System.currentTimeMillis());
+ final ExceptionHistoryEntry otherFailure =
+ new ExceptionHistoryEntry(
+ new RuntimeException("exception #1"),
+ System.currentTimeMillis(),
+ "task name",
+ new LocalTaskManagerLocation());
+
+ final ExecutionGraphInfo executionGraphInfo =
+ createExecutionGraphInfo(rootCause, otherFailure);
+ final HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters>
request =
+ createRequest(executionGraphInfo.getJobId(), 10);
+ final JobExceptionsInfoWithHistory response =
+ testInstance.handleRequest(request, executionGraphInfo);
+
+ assertThat(response, containsRootCause(rootCause.getException(),
rootCause.getTimestamp()));
+ assertThat(
+ response,
+ containsGlobalFailure(rootCause.getException(),
rootCause.getTimestamp()));
+ assertThat(
+ response,
+ historyContainsJobExceptionInfo(
+ otherFailure.getException(),
+ otherFailure.getTimestamp(),
+ otherFailure.getFailingTaskName(),
+
taskManagerLocationToString(otherFailure.getAssignedResourceLocation())));
+ assertThat(response, hasHistoryEntryCount(2));
+ assertThat(response, not(hasTruncatedHistory()));
+ assertThat(response, hasTotal(2));
+ }
+
+ @Test
+ public void testWithExceptionHistoryWithTruncationThroughParameter()
+ throws HandlerRequestException {
+ final ExceptionHistoryEntry rootCause =
+ ExceptionHistoryEntry.fromGlobalFailure(
+ new RuntimeException("exception #0"),
System.currentTimeMillis());
+ final ExceptionHistoryEntry otherFailure =
+ new ExceptionHistoryEntry(
+ new RuntimeException("exception #1"),
+ System.currentTimeMillis(),
+ "task name",
+ new LocalTaskManagerLocation());
+
+ final ExecutionGraphInfo executionGraphInfo =
+ createExecutionGraphInfo(rootCause, otherFailure);
+ final HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters>
request =
+ createRequest(executionGraphInfo.getJobId(), 1);
+ final JobExceptionsInfoWithHistory response =
+ testInstance.handleRequest(request, executionGraphInfo);
+
+ assertThat(response, containsRootCause(rootCause.getException(),
rootCause.getTimestamp()));
+ assertThat(
+ response,
+ containsGlobalFailure(rootCause.getException(),
rootCause.getTimestamp()));
+ assertThat(response, hasHistoryEntryCount(1));
+ assertThat(response, hasTruncatedHistory());
+ assertThat(response, hasTotal(2));
+ }
+
+ @Test
+ public void testWithExceptionHistoryWithInternalTruncation() throws
HandlerRequestException {
+ final ExceptionHistoryEntry rootCause =
+ ExceptionHistoryEntry.fromGlobalFailure(
+ new RuntimeException("exception #0"),
System.currentTimeMillis());
+
+ final ExecutionGraphInfo executionGraphInfo =
createExecutionGraphInfo(rootCause);
+ // adding the rootCause a second time here will result in the
underlying BoundedFIFOQueue to
+ // be truncated
+ executionGraphInfo.getExceptionHistory().add(rootCause);
+ final HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters>
request =
+ createRequest(executionGraphInfo.getJobId(), 10);
+ final JobExceptionsInfoWithHistory response =
+ testInstance.handleRequest(request, executionGraphInfo);
+
+ assertThat(response, containsRootCause(rootCause.getException(),
rootCause.getTimestamp()));
+ assertThat(
+ response,
+ containsGlobalFailure(rootCause.getException(),
rootCause.getTimestamp()));
+ assertThat(response, hasHistoryEntryCount(1));
+ assertThat(response, not(hasTruncatedHistory()));
Review comment:
This test was removed as part of the `elementCount` revert
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]