Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6203#discussion_r199196388
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
---
@@ -18,64 +18,118 @@
package org.apache.flink.runtime.rest.messages.job;
-import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.rest.messages.RequestBody;
-import org.apache.flink.util.Preconditions;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
/**
* Request for submitting a job.
*
- * <p>We currently require the job-jars to be uploaded through the
blob-server.
+ * <p>This request only contains the names of files that must be present
on the server, and defines how these files are
+ * interpreted.
*/
public final class JobSubmitRequestBody implements RequestBody {
- private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH =
"serializedJobGraph";
+ private static final String FIELD_NAME_JOB_GRAPH = "jobGraphFileName";
+ private static final String FIELD_NAME_JOB_JARS = "jobJarFileNames";
+ private static final String FIELD_NAME_JOB_ARTIFACTS =
"jobArtifactFileNames";
- /**
- * The serialized job graph.
- */
- @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
- public final byte[] serializedJobGraph;
+ @JsonProperty(FIELD_NAME_JOB_GRAPH)
+ public final String jobGraphFileName;
- public JobSubmitRequestBody(JobGraph jobGraph) throws IOException {
- this(serializeJobGraph(jobGraph));
- }
+ @JsonProperty(FIELD_NAME_JOB_JARS)
+ public final Collection<String> jarFileNames;
+
+ @JsonProperty(FIELD_NAME_JOB_ARTIFACTS)
+ public final Collection<DistributedCacheFile> artifactFileNames;
- @JsonCreator
public JobSubmitRequestBody(
- @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) byte[]
serializedJobGraph) {
- this.serializedJobGraph =
Preconditions.checkNotNull(serializedJobGraph);
+ @JsonProperty(FIELD_NAME_JOB_GRAPH) String
jobGraphFileName,
+ @JsonProperty(FIELD_NAME_JOB_JARS) Collection<String>
jarFileNames,
+ @JsonProperty(FIELD_NAME_JOB_ARTIFACTS)
Collection<DistributedCacheFile> artifactFileNames) {
+ this.jobGraphFileName = jobGraphFileName;
+ this.jarFileNames = jarFileNames;
+ this.artifactFileNames = artifactFileNames;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ JobSubmitRequestBody that = (JobSubmitRequestBody) o;
+ return Objects.equals(jobGraphFileName, that.jobGraphFileName)
&&
+ Objects.equals(jarFileNames, that.jarFileNames) &&
+ Objects.equals(artifactFileNames,
that.artifactFileNames);
}
@Override
public int hashCode() {
- return 71 * Arrays.hashCode(this.serializedJobGraph);
+ return Objects.hash(jobGraphFileName, jarFileNames,
artifactFileNames);
}
@Override
- public boolean equals(Object object) {
- if (object instanceof JobSubmitRequestBody) {
- JobSubmitRequestBody other = (JobSubmitRequestBody)
object;
- return Arrays.equals(this.serializedJobGraph,
other.serializedJobGraph);
- }
- return false;
+ public String toString() {
+ return "JobSubmitRequestBody{" +
+ "jobGraphFileName='" + jobGraphFileName + '\'' +
+ ", jarFileNames=" + jarFileNames +
+ ", artifactFileNames=" + artifactFileNames +
+ '}';
}
- private static byte[] serializeJobGraph(JobGraph jobGraph) throws
IOException {
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream(64
* 1024)) {
- ObjectOutputStream out = new ObjectOutputStream(baos);
+ /**
+ * Descriptor for a distributed cache file.
+ */
+ public static class DistributedCacheFile {
+ private static final String FIELD_NAME_ENTRY_NAME = "entryName";
+ private static final String FIELD_NAME_FILE_NAME = "fileName";
--- End diff --
I know it is out of scope, but it would have been nice if `fileName ==
entryName`.
---