[hotfix][history] Read/Write MultipleJobsDetails instead of manual JSON
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b6603f9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b6603f9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b6603f9 Branch: refs/heads/master Commit: 6b6603f91ac17ab02a7147d96a81297067245ac9 Parents: 2cef5fd Author: zentol <[email protected]> Authored: Tue Apr 24 10:26:32 2018 +0200 Committer: Till Rohrmann <[email protected]> Committed: Mon May 14 23:40:48 2018 +0200 ---------------------------------------------------------------------- .../history/HistoryServerArchiveFetcher.java | 19 ++++++++----------- .../webmonitor/history/HistoryServerTest.java | 8 +++----- 2 files changed, 11 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6b6603f9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java index 413473b..ac19197 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java @@ -24,6 +24,8 @@ import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.history.FsJobArchivist; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.rest.handler.legacy.JobsOverviewHandler; import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; import org.apache.flink.runtime.util.ExecutorThreadFactory; @@ -31,7 +33,6 @@ import org.apache.flink.util.FileUtils; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; @@ -42,6 +43,8 @@ import java.io.FileWriter; import java.io.IOException; import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -233,21 +236,15 @@ class HistoryServerArchiveFetcher { */ private static void updateJobOverview(File webOverviewDir, File webDir) { try (JsonGenerator gen = jacksonFactory.createGenerator(HistoryServer.createOrGetFile(webDir, JobsOverviewHeaders.URL))) { - gen.writeStartObject(); - gen.writeArrayFieldStart("jobs"); - File[] overviews = new File(webOverviewDir.getPath()).listFiles(); if (overviews != null) { + Collection<JobDetails> allJobs = new ArrayList<>(overviews.length); for (File overview : overviews) { - JsonNode root = mapper.readTree(overview); - JsonNode finished = root.get("jobs"); - JsonNode job = finished.get(0); - mapper.writeTree(gen, job); + MultipleJobsDetails subJobs = mapper.readValue(overview, MultipleJobsDetails.class); + allJobs.addAll(subJobs.getJobs()); } + mapper.writeValue(gen, new MultipleJobsDetails(allJobs)); } - - gen.writeEndArray(); - gen.writeEndObject(); } catch (IOException ioe) { LOG.error("Failed to update job overview.", ioe); } http://git-wip-us.apache.org/repos/asf/flink/blob/6b6603f9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java index de63b43..a16f6fb 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java @@ -27,11 +27,11 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.messages.ArchiveMessages; +import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; import org.apache.flink.util.TestLogger; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import akka.actor.ActorRef; @@ -94,11 +94,9 @@ public class HistoryServerTest extends TestLogger { ObjectMapper mapper = new ObjectMapper(); String response = getFromHTTP(baseUrl + JobsOverviewHeaders.URL); - JsonNode overview = mapper.readTree(response); + MultipleJobsDetails overview = mapper.readValue(response, MultipleJobsDetails.class); - String jobID = overview.get("jobs").get(0).get("jid").asText(); - JsonNode jobDetails = mapper.readTree(getFromHTTP(baseUrl + "/jobs/" + jobID)); - Assert.assertNotNull(jobDetails.get("jid")); + Assert.assertEquals(1, overview.getJobs().size()); } finally { hs.stop(); }
