[FLINK-9194] Add support for legacy history server formats

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d8b3ec8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d8b3ec8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d8b3ec8

Branch: refs/heads/release-1.5
Commit: 8d8b3ec8881bbe3838c4596fb50988a0142e6bbb
Parents: 7ba41bc
Author: zentol <[email protected]>
Authored: Tue May 8 13:13:59 2018 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Tue May 15 07:51:46 2018 +0200

----------------------------------------------------------------------
 .../history/HistoryServerArchiveFetcher.java    | 49 +++++++++++
 .../webmonitor/history/HistoryServerTest.java   | 88 +++++++++++++++++++-
 2 files changed, 135 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8d8b3ec8/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index ac19197..5acbe54 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -23,7 +23,9 @@ import org.apache.flink.configuration.HistoryServerOptions;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.handler.legacy.JobsOverviewHandler;
@@ -33,6 +35,7 @@ import org.apache.flink.util.FileUtils;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.slf4j.Logger;
@@ -41,10 +44,12 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.io.StringWriter;
 import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -168,6 +173,9 @@ class HistoryServerArchiveFetcher {
                                                                        File 
target;
                                                                        if 
(path.equals(JobsOverviewHeaders.URL)) {
                                                                                
target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
+                                                                       } else 
if (path.equals("/joboverview")) { // legacy path
+                                                                               
json = convertLegacyJobOverview(json);
+                                                                               
target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
                                                                        } else {
                                                                                
target = new File(webDir, path + JSON_FILE_ENDING);
                                                                        }
@@ -225,6 +233,47 @@ class HistoryServerArchiveFetcher {
                }
        }
 
+       private static String convertLegacyJobOverview(String legacyOverview) 
throws IOException {
+               JsonNode root = mapper.readTree(legacyOverview);
+               JsonNode finishedJobs = root.get("finished");
+               JsonNode job = finishedJobs.get(0);
+
+               JobID jobId = JobID.fromHexString(job.get("jid").asText());
+               String name = job.get("name").asText();
+               JobStatus state = JobStatus.valueOf(job.get("state").asText());
+
+               long startTime = job.get("start-time").asLong();
+               long endTime = job.get("end-time").asLong();
+               long duration = job.get("duration").asLong();
+               long lastMod = job.get("last-modification").asLong();
+
+               JsonNode tasks = job.get("tasks");
+               int numTasks = tasks.get("total").asInt();
+               int pending = tasks.get("pending").asInt();
+               int running = tasks.get("running").asInt();
+               int finished = tasks.get("finished").asInt();
+               int canceling = tasks.get("canceling").asInt();
+               int canceled = tasks.get("canceled").asInt();
+               int failed = tasks.get("failed").asInt();
+
+               int[] tasksPerState = new int[ExecutionState.values().length];
+               // pending is a mix of CREATED/SCHEDULED/DEPLOYING
+               // to maintain the correct number of task states we have to 
pick one of them
+               tasksPerState[ExecutionState.SCHEDULED.ordinal()] = pending;
+               tasksPerState[ExecutionState.RUNNING.ordinal()] = running;
+               tasksPerState[ExecutionState.FINISHED.ordinal()] = finished;
+               tasksPerState[ExecutionState.CANCELING.ordinal()] = canceling;
+               tasksPerState[ExecutionState.CANCELED.ordinal()] = canceled;
+               tasksPerState[ExecutionState.FAILED.ordinal()] = failed;
+
+               JobDetails jobDetails = new JobDetails(jobId, name, startTime, 
endTime, duration, state, lastMod, tasksPerState, numTasks);
+               MultipleJobsDetails multipleJobsDetails = new 
MultipleJobsDetails(Collections.singleton(jobDetails));
+
+               StringWriter sw = new StringWriter();
+               mapper.writeValue(sw, multipleJobsDetails);
+               return sw.toString();
+       }
+
        /**
         * This method replicates the JSON response that would be given by the 
{@link JobsOverviewHandler} when
         * listing both running and finished jobs.

http://git-wip-us.apache.org/repos/asf/flink/blob/8d8b3ec8/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 580d80f..9407af2 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -18,15 +18,19 @@
 
 package org.apache.flink.runtime.webmonitor.history;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HistoryServerOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.TestLogger;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.apache.commons.io.IOUtils;
@@ -38,12 +42,18 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.InputStream;
+import java.io.StringWriter;
 import java.net.HttpURLConnection;
 import java.net.URL;
+import java.nio.file.Path;
+import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils.JACKSON_FACTORY;
+
 /**
  * Tests for the HistoryServer.
  */
@@ -88,6 +98,7 @@ public class HistoryServerTest extends TestLogger {
                for (int x = 0; x < numJobs; x++) {
                        runJob();
                }
+               createLegacyArchive(jmDirectory.toPath());
 
                CountDownLatch numFinishedPolls = new CountDownLatch(1);
 
@@ -99,7 +110,7 @@ public class HistoryServerTest extends TestLogger {
 
                // the job is archived asynchronously after env.execute() 
returns
                File[] archives = jmDirectory.listFiles();
-               while (archives == null || archives.length != numJobs) {
+               while (archives == null || archives.length != numJobs + 1) {
                        Thread.sleep(50);
                        archives = jmDirectory.listFiles();
                }
@@ -114,7 +125,7 @@ public class HistoryServerTest extends TestLogger {
                        String response = getFromHTTP(baseUrl + 
JobsOverviewHeaders.URL);
                        MultipleJobsDetails overview = 
mapper.readValue(response, MultipleJobsDetails.class);
 
-                       Assert.assertEquals(numJobs, overview.getJobs().size());
+                       Assert.assertEquals(numJobs + 1, 
overview.getJobs().size());
                } finally {
                        hs.stop();
                }
@@ -143,4 +154,77 @@ public class HistoryServerTest extends TestLogger {
 
                return IOUtils.toString(is, connection.getContentEncoding() != 
null ? connection.getContentEncoding() : "UTF-8");
        }
+
+       private static void createLegacyArchive(Path directory) throws 
IOException {
+               JobID jobID = JobID.generate();
+
+               StringWriter sw = new StringWriter();
+               try (JsonGenerator gen = JACKSON_FACTORY.createGenerator(sw)) {
+                       try (JsonObject root = new JsonObject(gen)) {
+                               try (JsonArray finished = new JsonArray(gen, 
"finished")) {
+                                       try (JsonObject job = new 
JsonObject(gen)) {
+                                               gen.writeStringField("jid", 
jobID.toString());
+                                               gen.writeStringField("name", 
"testjob");
+                                               gen.writeStringField("state", 
JobStatus.FINISHED.name());
+
+                                               
gen.writeNumberField("start-time", 0L);
+                                               
gen.writeNumberField("end-time", 1L);
+                                               
gen.writeNumberField("duration", 1L);
+                                               
gen.writeNumberField("last-modification", 1L);
+
+                                               try (JsonObject tasks = new 
JsonObject(gen, "tasks")) {
+                                                       
gen.writeNumberField("total", 0);
+
+                                                       
gen.writeNumberField("pending", 0);
+                                                       
gen.writeNumberField("running", 0);
+                                                       
gen.writeNumberField("finished", 0);
+                                                       
gen.writeNumberField("canceling", 0);
+                                                       
gen.writeNumberField("canceled", 0);
+                                                       
gen.writeNumberField("failed", 0);
+                                               }
+                                       }
+                               }
+                       }
+               }
+               String json = sw.toString();
+
+               ArchivedJson archivedJson = new ArchivedJson("/joboverview", 
json);
+
+               FsJobArchivist.archiveJob(new 
org.apache.flink.core.fs.Path(directory.toUri()), jobID, 
Collections.singleton(archivedJson));
+       }
+
+       private static final class JsonObject implements AutoCloseable {
+
+               private final JsonGenerator gen;
+
+               JsonObject(JsonGenerator gen) throws IOException {
+                       this.gen = gen;
+                       gen.writeStartObject();
+               }
+
+               private JsonObject(JsonGenerator gen, String name) throws 
IOException {
+                       this.gen = gen;
+                       gen.writeObjectFieldStart(name);
+               }
+
+               @Override
+               public void close() throws IOException {
+                       gen.writeEndObject();
+               }
+       }
+
+       private static final class JsonArray implements AutoCloseable {
+
+               private final JsonGenerator gen;
+
+               JsonArray(JsonGenerator gen, String name) throws IOException {
+                       this.gen = gen;
+                       gen.writeArrayFieldStart(name);
+               }
+
+               @Override
+               public void close() throws IOException {
+                       gen.writeEndArray();
+               }
+       }
 }

Reply via email to