Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199196388 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java --- @@ -18,64 +18,118 @@ package org.apache.flink.runtime.rest.messages.job; -import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.rest.messages.RequestBody; -import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; /** * Request for submitting a job. * - * <p>We currently require the job-jars to be uploaded through the blob-server. + * <p>This request only contains the names of files that must be present on the server, and defines how these files are + * interpreted. */ public final class JobSubmitRequestBody implements RequestBody { - private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = "serializedJobGraph"; + private static final String FIELD_NAME_JOB_GRAPH = "jobGraphFileName"; + private static final String FIELD_NAME_JOB_JARS = "jobJarFileNames"; + private static final String FIELD_NAME_JOB_ARTIFACTS = "jobArtifactFileNames"; - /** - * The serialized job graph. - */ - @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) - public final byte[] serializedJobGraph; + @JsonProperty(FIELD_NAME_JOB_GRAPH) + public final String jobGraphFileName; - public JobSubmitRequestBody(JobGraph jobGraph) throws IOException { - this(serializeJobGraph(jobGraph)); - } + @JsonProperty(FIELD_NAME_JOB_JARS) + public final Collection<String> jarFileNames; + + @JsonProperty(FIELD_NAME_JOB_ARTIFACTS) + public final Collection<DistributedCacheFile> artifactFileNames; - @JsonCreator public JobSubmitRequestBody( - @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) byte[] serializedJobGraph) { - this.serializedJobGraph = Preconditions.checkNotNull(serializedJobGraph); + @JsonProperty(FIELD_NAME_JOB_GRAPH) String jobGraphFileName, + @JsonProperty(FIELD_NAME_JOB_JARS) Collection<String> jarFileNames, + @JsonProperty(FIELD_NAME_JOB_ARTIFACTS) Collection<DistributedCacheFile> artifactFileNames) { + this.jobGraphFileName = jobGraphFileName; + this.jarFileNames = jarFileNames; + this.artifactFileNames = artifactFileNames; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JobSubmitRequestBody that = (JobSubmitRequestBody) o; + return Objects.equals(jobGraphFileName, that.jobGraphFileName) && + Objects.equals(jarFileNames, that.jarFileNames) && + Objects.equals(artifactFileNames, that.artifactFileNames); } @Override public int hashCode() { - return 71 * Arrays.hashCode(this.serializedJobGraph); + return Objects.hash(jobGraphFileName, jarFileNames, artifactFileNames); } @Override - public boolean equals(Object object) { - if (object instanceof JobSubmitRequestBody) { - JobSubmitRequestBody other = (JobSubmitRequestBody) object; - return Arrays.equals(this.serializedJobGraph, other.serializedJobGraph); - } - return false; + public String toString() { + return "JobSubmitRequestBody{" + + "jobGraphFileName='" + jobGraphFileName + '\'' + + ", jarFileNames=" + jarFileNames + + ", artifactFileNames=" + artifactFileNames + + '}'; } - private static byte[] serializeJobGraph(JobGraph jobGraph) throws IOException { - try (ByteArrayOutputStream baos = new ByteArrayOutputStream(64 * 1024)) { - ObjectOutputStream out = new ObjectOutputStream(baos); + /** + * Descriptor for a distributed cache file. + */ + public static class DistributedCacheFile { + private static final String FIELD_NAME_ENTRY_NAME = "entryName"; + private static final String FIELD_NAME_FILE_NAME = "fileName"; --- End diff -- I know it is out of scope, but it would have been nice if `fileName == entryName`.
---