dmvk commented on code in PR #22153:
URL: https://github.com/apache/flink/pull/22153#discussion_r1136854135


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobResourceRequirements.java:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Information about the parallelism of job vertices. */
+public class JobResourceRequirements implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * A key for an internal config option (intentionally prefixed with 
$internal to make this
+     * explicit), that we'll serialize the {@link JobResourceRequirements} 
into, when writing it to
+     * {@link JobGraph}.
+     */
+    private static final String JOB_RESOURCE_REQUIREMENTS_KEY =
+            "$internal.job-resource-requirements";
+
+    private static final JobResourceRequirements EMPTY =
+            new JobResourceRequirements(Collections.emptyMap());
+
+    /**
+     * Write {@link JobResourceRequirements resource requirements} into the 
configuration of a given
+     * {@link JobGraph}.
+     *
+     * @param jobGraph job graph to write requirements to
+     * @param jobResourceRequirements resource requirements to write
+     * @throws IOException in case we're not able to serialize requirements 
into the configuration
+     */
+    public static void writeToJobGraph(
+            JobGraph jobGraph, JobResourceRequirements 
jobResourceRequirements) throws IOException {
+        InstantiationUtil.writeObjectToConfig(
+                jobResourceRequirements,
+                jobGraph.getJobConfiguration(),
+                JOB_RESOURCE_REQUIREMENTS_KEY);
+    }
+
+    /**
+     * Read {@link JobResourceRequirements resource requirements} from the 
configuration of a given
+     * {@link JobGraph}.
+     *
+     * @param jobGraph job graph to read requirements from
+     * @throws IOException in case we're not able to deserialize requirements 
from the configuration
+     * @throws ClassNotFoundException in case some deserialized classes are 
missing on the classpath
+     */
+    public static Optional<JobResourceRequirements> readFromJobGraph(JobGraph 
jobGraph)
+            throws IOException, ClassNotFoundException {

Review Comment:
   > I guess the only way to throw a ClassNotFoundException here is for 
JobResourceRequirements (or inner ones) class to be missing ?
   
   Or if the JRR was written by a different Flink version (this method will be 
used for reading from a JobGrapStore).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to