This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new adc9e41 [FLINK-23034][runtime] Added compatibility for ExecutionState
in HistoryServer
adc9e41 is described below
commit adc9e4125ff97b637ef38ae4071fa17cd5dcfa91
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Fri Jun 18 15:26:10 2021 +0300
[FLINK-23034][runtime] Added compatibility for ExecutionState in
HistoryServer
---
.../runtime/messages/webmonitor/JobDetails.java | 4 +-
.../messages/webmonitor/JobDetailsTest.java | 46 ++++++++++++++++++++++
2 files changed, 49 insertions(+), 1 deletion(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
index ee2fa84..f9b609a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
@@ -304,8 +304,10 @@ public class JobDetails implements Serializable {
int[] numVerticesPerExecutionState = new
int[ExecutionState.values().length];
for (ExecutionState executionState : ExecutionState.values()) {
+ JsonNode jsonNode =
tasksNode.get(executionState.name().toLowerCase());
+
numVerticesPerExecutionState[executionState.ordinal()] =
-
tasksNode.get(executionState.name().toLowerCase()).intValue();
+ jsonNode == null ? 0 : jsonNode.intValue();
}
return new JobDetails(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
index 54d7219..0ab6ddc 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
@@ -29,10 +29,34 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
import org.junit.Test;
+import java.io.IOException;
+
import static org.junit.Assert.assertEquals;
/** Tests for the {@link JobDetails}. */
public class JobDetailsTest extends TestLogger {
+ private static final String COMPATIBLE_JOB_DETAILS =
+ "{"
+ + " \"jid\" : \"7a7c3291accebd10b6be8d4f8c8d8dfc\","
+ + " \"name\" : \"foobar\","
+ + " \"state\" : \"RUNNING\","
+ + " \"start-time\" : 1,"
+ + " \"end-time\" : 10,"
+ + " \"duration\" : 9,"
+ + " \"last-modification\" : 8,"
+ + " \"tasks\" : {"
+ + " \"total\" : 42,"
+ + " \"created\" : 1,"
+ + " \"scheduled\" : 3,"
+ + " \"deploying\" : 3,"
+ + " \"running\" : 4,"
+ + " \"finished\" : 7,"
+ + " \"canceling\" : 4,"
+ + " \"canceled\" : 2,"
+ + " \"failed\" : 7,"
+ + " \"reconciling\" : 3"
+ + " }"
+ + "}";
/** Tests that we can marshal and unmarshal JobDetails instances. */
@Test
@@ -57,4 +81,26 @@ public class JobDetailsTest extends TestLogger {
assertEquals(expected, unmarshalled);
}
+
+ @Test
+ public void testJobDetailsCompatibleUnmarshalling() throws IOException {
+ final JobDetails expected =
+ new JobDetails(
+
JobID.fromHexString("7a7c3291accebd10b6be8d4f8c8d8dfc"),
+ "foobar",
+ 1L,
+ 10L,
+ 9L,
+ JobStatus.RUNNING,
+ 8L,
+ new int[] {1, 3, 3, 4, 7, 4, 2, 7, 3, 0},
+ 42);
+
+ final ObjectMapper objectMapper =
RestMapperUtils.getStrictObjectMapper();
+
+ final JobDetails unmarshalled =
+ objectMapper.readValue(COMPATIBLE_JOB_DETAILS,
JobDetails.class);
+
+ assertEquals(expected, unmarshalled);
+ }
}