[ 
https://issues.apache.org/jira/browse/FLINK-1616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14346681#comment-14346681
 ] 

ASF GitHub Bot commented on FLINK-1616:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/451#discussion_r25763670
  
    --- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -601,452 +482,295 @@ public int compare(ExecutionGraph o1, 
ExecutionGraph o2) {
        }
        
        /**
    -    * Executes the cancel action.
    +    * Executes the CANCEL action.
         * 
         * @param args Command line arguments for the cancel action.
         */
        protected int cancel(String[] args) {
    -           // Parse command line options
    -           CommandLine line;
    +           LOG.info("Running 'cancel' command.");
    +
    +           CancelOptions options;
                try {
    -                   line = parser.parse(CANCEL_OPTIONS, args, false);
    -                   evaluateGeneralOptions(line);
    -           }
    -           catch (MissingOptionException e) {
    -                   return handleArgException(e);
    -           }
    -           catch (MissingArgumentException e) {
    -                   return handleArgException(e);
    +                   options = CliFrontendParser.parseCancelCommand(args);
                }
    -           catch (UnrecognizedOptionException e) {
    +           catch (CliArgsException e) {
                        return handleArgException(e);
                }
    -           catch (Exception e) {
    -                   return handleError(e);
    +           catch (Throwable t) {
    +                   return handleError(t);
                }
    -           
    -           if (printHelp) {
    -                   printHelpForCancel();
    +
    +           // evaluate help flag
    +           if (options.isPrintHelp()) {
    +                   CliFrontendParser.printHelpForCancel();
                        return 0;
                }
                
    -           String[] cleanedArgs = line.getArgs();
    +           String[] cleanedArgs = options.getArgs();
                JobID jobId;
     
                if (cleanedArgs.length > 0) {
                        String jobIdString = cleanedArgs[0];
                        try {
                                jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
    -                   } catch (Exception e) {
    +                   }
    +                   catch (Exception e) {
    +                           LOG.error("Error: The value for the Job ID is 
not a valid ID.");
                                System.out.println("Error: The value for the 
Job ID is not a valid ID.");
                                return 1;
                        }
    -           } else {
    +           }
    +           else {
    +                   LOG.error("Missing JobID in the command line 
arguments.");
                        System.out.println("Error: Specify a Job ID to cancel a 
job.");
                        return 1;
                }
                
                try {
    -                   ActorRef jobManager = getJobManager(line, 
getGlobalConfiguration());
    -
    -                   if (jobManager == null) {
    -                           return 1;
    -                   }
    -
    -                   final Future<Object> response = 
Patterns.ask(jobManager, new CancelJob(jobId),
    -                                   new Timeout(getAkkaTimeout()));
    +                   ActorRef jobManager = getJobManager(options);
    +                   Future<Object> response = Patterns.ask(jobManager, new 
CancelJob(jobId), new Timeout(askTimeout));
     
                        try {
    -                           Await.ready(response, getAkkaTimeout());
    -                   } catch (Exception exception) {
    -                           throw new IOException("Canceling the job with 
job ID " + jobId + " failed.",
    -                                           exception);
    +                           Await.result(response, askTimeout);
    --- End diff --
    
    My bad, we want the exception.


> Action "list -r" gives IOException when there are running jobs
> --------------------------------------------------------------
>
>                 Key: FLINK-1616
>                 URL: https://issues.apache.org/jira/browse/FLINK-1616
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 0.9
>            Reporter: Vasia Kalavri
>            Priority: Minor
>
> Here's the full exception:
> java.io.IOException: Could not retrieve running jobs from job manager.
>       at org.apache.flink.client.CliFrontend.list(CliFrontend.java:528)
>       at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1089)
>       at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1114)
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka.tcp://flink@10.20.0.25:13245/user/jobmanager#-1517424081]] after 
> [100000 ms]
>       at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
>       at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>       at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.run(Scheduler.scala:476)
>       at 
> akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(Scheduler.scala:282)
>       at 
> akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(Scheduler.scala:281)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at akka.actor.LightArrayRevolverScheduler.close(Scheduler.scala:280)
>       at akka.actor.ActorSystemImpl.stopScheduler(ActorSystem.scala:688)
>       at 
> akka.actor.ActorSystemImpl$$anonfun$liftedTree2$1$1.apply$mcV$sp(ActorSystem.scala:617)
>       at 
> akka.actor.ActorSystemImpl$$anonfun$liftedTree2$1$1.apply(ActorSystem.scala:617)
>       at 
> akka.actor.ActorSystemImpl$$anonfun$liftedTree2$1$1.apply(ActorSystem.scala:617)
>       at akka.actor.ActorSystemImpl$$anon$3.run(ActorSystem.scala:641)
>       at 
> akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.runNext$1(ActorSystem.scala:808)
>       at 
> akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply$mcV$sp(ActorSystem.scala:811)
>       at 
> akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply(ActorSystem.scala:804)
>       at 
> akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply(ActorSystem.scala:804)
>       at akka.util.ReentrantGuard.withGuard(LockUtil.scala:15)
>       at 
> akka.actor.ActorSystemImpl$TerminationCallbacks.run(ActorSystem.scala:804)
>       at 
> akka.actor.ActorSystemImpl$$anonfun$terminationCallbacks$1.apply(ActorSystem.scala:638)
>       at 
> akka.actor.ActorSystemImpl$$anonfun$terminationCallbacks$1.apply(ActorSystem.scala:638)
>       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>       at 
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
>       at 
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
>       at 
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>       at 
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>       at 
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>       at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> If there are no running jobs, no exception is thrown.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to