Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2083#discussion_r68581644
  
    --- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) {
        }
     
        /**
    -    * Sends a {@link 
org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
    -    * message to the job manager.
    +    * Asks the JobManager to dispose a savepoint.
    +    *
    +    * <p>There are two options for this:
    +    * <ul>
    +    * <li>Either the job the savepoint belongs to is still running, in 
which
    +    * case the user code class loader of the job is used.</li>
    +    * <li>Or the job terminated, in which case the user JARs have to be
    +    * uploaded before disposing the savepoint.</li>
    +    * </ul>
         */
    -   private int disposeSavepoint(SavepointOptions options, String 
savepointPath) {
    -           try {
    -                   ActorGateway jobManager = getJobManagerGateway(options);
    -                   logAndSysout("Disposing savepoint '" + savepointPath + 
"'.");
    -                   Future<Object> response = jobManager.ask(new 
DisposeSavepoint(savepointPath), clientTimeout);
    +   private int disposeSavepoint(SavepointOptions options) throws Throwable 
{
    +           String savepointPath = 
Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path");
     
    -                   Object result;
    -                   try {
    -                           logAndSysout("Waiting for response...");
    -                           result = Await.result(response, clientTimeout);
    +           JobID jobId = options.getJobId();
    +           String jarFile = options.getJarFilePath();
    +
    +           if (jobId != null && jarFile != null) {
    +                   throw new IllegalArgumentException("Cannot dispose 
savepoint without Job ID or JAR.");
    +           }
    +
    +           ActorGateway jobManager = getJobManagerGateway(options);
    +
    +           final Future<Object> response;
    +           if (jobId != null) {
    +                   // Dispose with class loader of running job
    +                   logAndSysout("Disposing savepoint at '" + savepointPath 
+ "' of job " + jobId + " .");
    +
    +                   response = jobManager.ask(
    +                                   new 
JobManagerMessages.DisposeSavepoint(savepointPath, jobId),
    +                                   clientTimeout);
    +           } else if (jarFile != null) {
    +                   logAndSysout("Disposing savepoint at '" + savepointPath 
+ "'.");
    +
    +                   // Dispose with uploaded user code loader
    +                   Optimizer compiler = new Optimizer(new 
DataStatistics(), new DefaultCostEstimator(), config);
    +                   PackagedProgram program = new PackagedProgram(
    +                                   new File(jarFile),
    +                                   options.getClasspaths(),
    +                                   options.getEntryPointClass(),
    +                                   options.getProgramArgs());
    +                   FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, 
program, 1);
    +
    +                   JobGraph jobGraph;
    +                   if (flinkPlan instanceof StreamingPlan) {
    +                           jobGraph = ((StreamingPlan) 
flinkPlan).getJobGraph();
    +                   } else {
    +                           JobGraphGenerator gen = new 
JobGraphGenerator(this.config);
    +                           jobGraph = gen.compileJobGraph((OptimizedPlan) 
flinkPlan);
                        }
    -                   catch (Exception e) {
    -                           throw new Exception("Disposing the savepoint 
with path" + savepointPath + " failed.", e);
    +
    +                   for (URL jar : program.getAllLibraries()) {
    +                           try {
    +                                   jobGraph.addJar(new Path(jar.toURI()));
    +                           } catch (URISyntaxException e) {
    +                                   throw new RuntimeException("URL is 
invalid. This should not happen.", e);
    +                           }
                        }
     
    +                   jobGraph.setClasspaths(program.getClasspaths());
    +
    +                   logAndSysout("Uploading JAR files for savepoint 
disposal.");
    +                   JobClient.uploadJarFiles(jobGraph, jobManager, 
clientTimeout);
    --- End diff --
    
    Maybe we could also refactor the current code so that the `JobGraph` no 
longer does the uploading of the jars to the blob server. Then we wouldn't have 
to construct a `JobGraph` at all. I also think that it shouldn't be the 
responsibility of the `JobGraph` to upload the user code jars but simply to 
store the file names.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to