Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6203#discussion_r199196388
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
 ---
    @@ -18,64 +18,118 @@
     
     package org.apache.flink.runtime.rest.messages.job;
     
    -import org.apache.flink.runtime.jobgraph.JobGraph;
     import org.apache.flink.runtime.rest.messages.RequestBody;
    -import org.apache.flink.util.Preconditions;
     
     import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
     import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
     
    -import java.io.ByteArrayOutputStream;
    -import java.io.IOException;
    -import java.io.ObjectOutputStream;
    -import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Objects;
     
     /**
      * Request for submitting a job.
      *
    - * <p>We currently require the job-jars to be uploaded through the 
blob-server.
    + * <p>This request only contains the names of files that must be present 
on the server, and defines how these files are
    + * interpreted.
      */
     public final class JobSubmitRequestBody implements RequestBody {
     
    -   private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = 
"serializedJobGraph";
    +   private static final String FIELD_NAME_JOB_GRAPH = "jobGraphFileName";
    +   private static final String FIELD_NAME_JOB_JARS = "jobJarFileNames";
    +   private static final String FIELD_NAME_JOB_ARTIFACTS = 
"jobArtifactFileNames";
     
    -   /**
    -    * The serialized job graph.
    -    */
    -   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
    -   public final byte[] serializedJobGraph;
    +   @JsonProperty(FIELD_NAME_JOB_GRAPH)
    +   public final String jobGraphFileName;
     
    -   public JobSubmitRequestBody(JobGraph jobGraph) throws IOException {
    -           this(serializeJobGraph(jobGraph));
    -   }
    +   @JsonProperty(FIELD_NAME_JOB_JARS)
    +   public final Collection<String> jarFileNames;
    +
    +   @JsonProperty(FIELD_NAME_JOB_ARTIFACTS)
    +   public final Collection<DistributedCacheFile> artifactFileNames;
     
    -   @JsonCreator
        public JobSubmitRequestBody(
    -                   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) byte[] 
serializedJobGraph) {
    -           this.serializedJobGraph = 
Preconditions.checkNotNull(serializedJobGraph);
    +                   @JsonProperty(FIELD_NAME_JOB_GRAPH) String 
jobGraphFileName,
    +                   @JsonProperty(FIELD_NAME_JOB_JARS) Collection<String> 
jarFileNames,
    +                   @JsonProperty(FIELD_NAME_JOB_ARTIFACTS) 
Collection<DistributedCacheFile> artifactFileNames) {
    +           this.jobGraphFileName = jobGraphFileName;
    +           this.jarFileNames = jarFileNames;
    +           this.artifactFileNames = artifactFileNames;
    +   }
    +
    +   @Override
    +   public boolean equals(Object o) {
    +           if (this == o) {
    +                   return true;
    +           }
    +           if (o == null || getClass() != o.getClass()) {
    +                   return false;
    +           }
    +           JobSubmitRequestBody that = (JobSubmitRequestBody) o;
    +           return Objects.equals(jobGraphFileName, that.jobGraphFileName) 
&&
    +                   Objects.equals(jarFileNames, that.jarFileNames) &&
    +                   Objects.equals(artifactFileNames, 
that.artifactFileNames);
        }
     
        @Override
        public int hashCode() {
    -           return 71 * Arrays.hashCode(this.serializedJobGraph);
    +           return Objects.hash(jobGraphFileName, jarFileNames, 
artifactFileNames);
        }
     
        @Override
    -   public boolean equals(Object object) {
    -           if (object instanceof JobSubmitRequestBody) {
    -                   JobSubmitRequestBody other = (JobSubmitRequestBody) 
object;
    -                   return Arrays.equals(this.serializedJobGraph, 
other.serializedJobGraph);
    -           }
    -           return false;
    +   public String toString() {
    +           return "JobSubmitRequestBody{" +
    +                   "jobGraphFileName='" + jobGraphFileName + '\'' +
    +                   ", jarFileNames=" + jarFileNames +
    +                   ", artifactFileNames=" + artifactFileNames +
    +                   '}';
        }
     
    -   private static byte[] serializeJobGraph(JobGraph jobGraph) throws 
IOException {
    -           try (ByteArrayOutputStream baos = new ByteArrayOutputStream(64 
* 1024)) {
    -                   ObjectOutputStream out = new ObjectOutputStream(baos);
    +   /**
    +    * Descriptor for a distributed cache file.
    +    */
    +   public static class DistributedCacheFile {
    +           private static final String FIELD_NAME_ENTRY_NAME = "entryName";
    +           private static final String FIELD_NAME_FILE_NAME = "fileName";
    --- End diff --
    
    I know it is out of scope, but it would have been nice if `fileName == 
entryName`.


---

Reply via email to