[ 
https://issues.apache.org/jira/browse/FLINK-4051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15341536#comment-15341536
 ] 

Maximilian Michels commented on FLINK-4051:
-------------------------------------------

It looks to me that the {{nextDelievery()}} method is interruptible. It throws 
an {{InterruptedException}} in case it is interrupted. As [~rmetzger] pointed 
out Flink sets the interrupted flag on the source thread during cancellation.

I've just tested this behavior using this test class:

{code:java}
public class RMQSourceTest {
        public static void main(String[] args) throws Exception {

                final RMQConnectionConfig config = new 
RMQConnectionConfig.Builder()
                                .setHost("localhost")
                                .setPort(5672)
                                .setVirtualHost("/")
                                .setUserName("guest")
                                .setPassword("guest")
                                .build();

                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

                env.addSource(new RMQSource<>(config, "test", new 
SimpleStringSchema())).print();

                env.execute();
        }
}
{code}

Then I deployed a Jar on a Flink cluster with RabbitMQ running on localhost on 
port 5672:

{code:none}
flink run -c org.myorg.quickstart.RMQSourceTest quickstart-0.1.jar
{code}

Nothing was printed of course because the queue is empty :) Then I cancelled:

{code:none}
$ ./flink list
------------------ Running/Restarting Jobs -------------------
21.06.2016 12:31:25 : 122456d90688011df6f8fa73de2d036c : Flink Streaming Job 
(RUNNING)
--------------------------------------------------------------
No scheduled jobs.
$ flink cancel 122456d90688011df6f8fa73de2d036c
{code}

Here's the output of the program:

{noformat}
06/21/2016 12:31:25     Job execution switched to status RUNNING.
06/21/2016 12:31:25     Source: Custom Source -> Sink: Unnamed(1/1) switched to 
SCHEDULED
06/21/2016 12:31:25     Source: Custom Source -> Sink: Unnamed(1/1) switched to 
DEPLOYING
06/21/2016 12:31:25     Source: Custom Source -> Sink: Unnamed(1/1) switched to 
RUNNING
06/21/2016 12:31:50     Job execution switched to status CANCELLING.
06/21/2016 12:31:50     Source: Custom Source -> Sink: Unnamed(1/1) switched to 
CANCELING
06/21/2016 12:31:50     Source: Custom Source -> Sink: Unnamed(1/1) switched to 
CANCELED
06/21/2016 12:31:50     Job execution switched to status CANCELED.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Job was cancelled.
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
        at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:91)
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:355)
        at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1381)
        at org.myorg.quickstart.RMQSourceTest.main(RMQSourceTest.java:41)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:297)
        at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:740)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
        at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:965)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1008)
Caused by: org.apache.flink.runtime.client.JobCancellationException: Job was 
cancelled.
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:798)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752)
        at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{noformat}

It seems like everything works as expected, i.e. the source is interrupted 
correctly. Note that {{nextDelievery()}} doesn't prevent creation of 
checkpoints. So it generally fine that it blocks until messages are available.

If the issue is resolved I would like to close this issue. Please let us know 
if you experienced any other problems.

> RabbitMQ Source might not react to cancel signal
> ------------------------------------------------
>
>                 Key: FLINK-4051
>                 URL: https://issues.apache.org/jira/browse/FLINK-4051
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>            Reporter: Robert Metzger
>            Assignee: Subhankar Biswas
>
> As reported here 
> https://issues.apache.org/jira/browse/FLINK-3763?focusedCommentId=15322517&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15322517,
>  the RabbitMQ source might block forever / ignore the cancelling signal, if 
> its listening to an empty queue.
> Fix: call nextDelivery() with a timeout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to