[FLINK-9194] Add support for legacy history server formats
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/69583800 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/69583800 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/69583800 Branch: refs/heads/master Commit: 6958380038b82f43ba44ebb3e0af8091399da93b Parents: 6d8cc73 Author: zentol <[email protected]> Authored: Tue May 8 13:13:59 2018 +0200 Committer: Till Rohrmann <[email protected]> Committed: Mon May 14 23:40:48 2018 +0200 ---------------------------------------------------------------------- .../history/HistoryServerArchiveFetcher.java | 49 +++++++++++ .../webmonitor/history/HistoryServerTest.java | 88 +++++++++++++++++++- 2 files changed, 135 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/69583800/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java index ac19197..5acbe54 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java @@ -23,7 +23,9 @@ import org.apache.flink.configuration.HistoryServerOptions; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.history.FsJobArchivist; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.rest.handler.legacy.JobsOverviewHandler; @@ -33,6 +35,7 @@ import org.apache.flink.util.FileUtils; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; @@ -41,10 +44,12 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.io.StringWriter; import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -168,6 +173,9 @@ class HistoryServerArchiveFetcher { File target; if (path.equals(JobsOverviewHeaders.URL)) { target = new File(webOverviewDir, jobID + JSON_FILE_ENDING); + } else if (path.equals("/joboverview")) { // legacy path + json = convertLegacyJobOverview(json); + target = new File(webOverviewDir, jobID + JSON_FILE_ENDING); } else { target = new File(webDir, path + JSON_FILE_ENDING); } @@ -225,6 +233,47 @@ class HistoryServerArchiveFetcher { } } + private static String convertLegacyJobOverview(String legacyOverview) throws IOException { + JsonNode root = mapper.readTree(legacyOverview); + JsonNode finishedJobs = root.get("finished"); + JsonNode job = finishedJobs.get(0); + + JobID jobId = JobID.fromHexString(job.get("jid").asText()); + String name = job.get("name").asText(); + JobStatus state = JobStatus.valueOf(job.get("state").asText()); + + long startTime = job.get("start-time").asLong(); + long endTime = job.get("end-time").asLong(); + long duration = job.get("duration").asLong(); + long lastMod = job.get("last-modification").asLong(); + + JsonNode tasks = job.get("tasks"); + int numTasks = tasks.get("total").asInt(); + int pending = tasks.get("pending").asInt(); + int running = tasks.get("running").asInt(); + int finished = tasks.get("finished").asInt(); + int canceling = tasks.get("canceling").asInt(); + int canceled = tasks.get("canceled").asInt(); + int failed = tasks.get("failed").asInt(); + + int[] tasksPerState = new int[ExecutionState.values().length]; + // pending is a mix of CREATED/SCHEDULED/DEPLOYING + // to maintain the correct number of task states we have to pick one of them + tasksPerState[ExecutionState.SCHEDULED.ordinal()] = pending; + tasksPerState[ExecutionState.RUNNING.ordinal()] = running; + tasksPerState[ExecutionState.FINISHED.ordinal()] = finished; + tasksPerState[ExecutionState.CANCELING.ordinal()] = canceling; + tasksPerState[ExecutionState.CANCELED.ordinal()] = canceled; + tasksPerState[ExecutionState.FAILED.ordinal()] = failed; + + JobDetails jobDetails = new JobDetails(jobId, name, startTime, endTime, duration, state, lastMod, tasksPerState, numTasks); + MultipleJobsDetails multipleJobsDetails = new MultipleJobsDetails(Collections.singleton(jobDetails)); + + StringWriter sw = new StringWriter(); + mapper.writeValue(sw, multipleJobsDetails); + return sw.toString(); + } + /** * This method replicates the JSON response that would be given by the {@link JobsOverviewHandler} when * listing both running and finished jobs. http://git-wip-us.apache.org/repos/asf/flink/blob/69583800/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java index 580d80f..9407af2 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java @@ -18,15 +18,19 @@ package org.apache.flink.runtime.webmonitor.history; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HistoryServerOptions; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.history.FsJobArchivist; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.util.MiniClusterResource; import org.apache.flink.util.TestLogger; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.io.IOUtils; @@ -38,12 +42,18 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import java.io.File; +import java.io.IOException; import java.io.InputStream; +import java.io.StringWriter; import java.net.HttpURLConnection; import java.net.URL; +import java.nio.file.Path; +import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils.JACKSON_FACTORY; + /** * Tests for the HistoryServer. */ @@ -88,6 +98,7 @@ public class HistoryServerTest extends TestLogger { for (int x = 0; x < numJobs; x++) { runJob(); } + createLegacyArchive(jmDirectory.toPath()); CountDownLatch numFinishedPolls = new CountDownLatch(1); @@ -99,7 +110,7 @@ public class HistoryServerTest extends TestLogger { // the job is archived asynchronously after env.execute() returns File[] archives = jmDirectory.listFiles(); - while (archives == null || archives.length != numJobs) { + while (archives == null || archives.length != numJobs + 1) { Thread.sleep(50); archives = jmDirectory.listFiles(); } @@ -114,7 +125,7 @@ public class HistoryServerTest extends TestLogger { String response = getFromHTTP(baseUrl + JobsOverviewHeaders.URL); MultipleJobsDetails overview = mapper.readValue(response, MultipleJobsDetails.class); - Assert.assertEquals(numJobs, overview.getJobs().size()); + Assert.assertEquals(numJobs + 1, overview.getJobs().size()); } finally { hs.stop(); } @@ -143,4 +154,77 @@ public class HistoryServerTest extends TestLogger { return IOUtils.toString(is, connection.getContentEncoding() != null ? connection.getContentEncoding() : "UTF-8"); } + + private static void createLegacyArchive(Path directory) throws IOException { + JobID jobID = JobID.generate(); + + StringWriter sw = new StringWriter(); + try (JsonGenerator gen = JACKSON_FACTORY.createGenerator(sw)) { + try (JsonObject root = new JsonObject(gen)) { + try (JsonArray finished = new JsonArray(gen, "finished")) { + try (JsonObject job = new JsonObject(gen)) { + gen.writeStringField("jid", jobID.toString()); + gen.writeStringField("name", "testjob"); + gen.writeStringField("state", JobStatus.FINISHED.name()); + + gen.writeNumberField("start-time", 0L); + gen.writeNumberField("end-time", 1L); + gen.writeNumberField("duration", 1L); + gen.writeNumberField("last-modification", 1L); + + try (JsonObject tasks = new JsonObject(gen, "tasks")) { + gen.writeNumberField("total", 0); + + gen.writeNumberField("pending", 0); + gen.writeNumberField("running", 0); + gen.writeNumberField("finished", 0); + gen.writeNumberField("canceling", 0); + gen.writeNumberField("canceled", 0); + gen.writeNumberField("failed", 0); + } + } + } + } + } + String json = sw.toString(); + + ArchivedJson archivedJson = new ArchivedJson("/joboverview", json); + + FsJobArchivist.archiveJob(new org.apache.flink.core.fs.Path(directory.toUri()), jobID, Collections.singleton(archivedJson)); + } + + private static final class JsonObject implements AutoCloseable { + + private final JsonGenerator gen; + + JsonObject(JsonGenerator gen) throws IOException { + this.gen = gen; + gen.writeStartObject(); + } + + private JsonObject(JsonGenerator gen, String name) throws IOException { + this.gen = gen; + gen.writeObjectFieldStart(name); + } + + @Override + public void close() throws IOException { + gen.writeEndObject(); + } + } + + private static final class JsonArray implements AutoCloseable { + + private final JsonGenerator gen; + + JsonArray(JsonGenerator gen, String name) throws IOException { + this.gen = gen; + gen.writeArrayFieldStart(name); + } + + @Override + public void close() throws IOException { + gen.writeEndArray(); + } + } }
