I am trying to write a simple "Hello World" kind of application using spark streaming and RabbitMq, in which Apache Spark Streaming will read message from RabbitMq via the RabbitMqReceiver <https://github.com/Stratio/rabbitmq-receiver> and print it in the console. But some how I am not able to print the string read from Rabbit Mq into console. The spark streaming code is printing the message below:-

|Value Received BlockRDD[1] at ReceiverInputDStream at RabbitMQInputDStream.scala:33 Value Received BlockRDD[2] at ReceiverInputDStream at RabbitMQInputDStream.scala:33 |

The message is sent to the rabbitmq via the simple code below:-

|package helloWorld; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Send { private final static String QUEUE_NAME = "hello1"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World! is a code. Hi Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } } |

I am trying to read messages via Apache Streaming as shown below:-

|package rabbitmq.example; import java.util.*; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import com.stratio.receiver.RabbitMQUtils; public class RabbitMqEx { public static void main(String[] args) { System.out.println("Creating Spark Configuration"); SparkConf conf = new SparkConf(); conf.setAppName("RabbitMq Receiver Example"); conf.setMaster("local[2]"); System.out.println("Retreiving Streaming Context from Spark Conf"); JavaStreamingContext streamCtx = new JavaStreamingContext(conf, Durations.seconds(2)); Map<String, String>rabbitMqConParams = new HashMap<String, String>(); rabbitMqConParams.put("host", "localhost"); rabbitMqConParams.put("queueName", "hello1"); System.out.println("Trying to connect to RabbitMq"); JavaReceiverInputDStream<String> receiverStream = RabbitMQUtils.createJavaStream(streamCtx, rabbitMqConParams); receiverStream.foreachRDD(new Function<JavaRDD<String>, Void>() { @Override public Void call(JavaRDD<String> arg0) throws Exception { System.out.println("Value Received " + arg0.toString()); return null; } } ); streamCtx.start(); streamCtx.awaitTermination(); } } |

The output console only has message like the following:-

|Creating Spark Configuration Retreiving Streaming Context from Spark Conf Trying to connect to RabbitMq Value Received BlockRDD[1] at ReceiverInputDStream at RabbitMQInputDStream.scala:33 Value Received BlockRDD[2] at ReceiverInputDStream at RabbitMQInputDStream.scala:33 |

In the logs I see the following:-

|15/11/18 13:20:45 INFO SparkContext: Running Spark version 1.5.2 15/11/18 13:20:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/11/18 13:20:45 WARN Utils: Your hostname, jabong1143 resolves to a loopback address: 127.0.1.1; using 192.168.1.3 instead (on interface wlan0) 15/11/18 13:20:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 15/11/18 13:20:45 INFO SecurityManager: Changing view acls to: jabong 15/11/18 13:20:45 INFO SecurityManager: Changing modify acls to: jabong 15/11/18 13:20:45 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(jabong); users with modify permissions: Set(jabong) 15/11/18 13:20:46 INFO Slf4jLogger: Slf4jLogger started 15/11/18 13:20:46 INFO Remoting: Starting remoting 15/11/18 13:20:46 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.3:42978] 15/11/18 13:20:46 INFO Utils: Successfully started service 'sparkDriver' on port 42978. 15/11/18 13:20:46 INFO SparkEnv: Registering MapOutputTracker 15/11/18 13:20:46 INFO SparkEnv: Registering BlockManagerMaster 15/11/18 13:20:46 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-9309b35f-a506-49dc-91ab-5c340cd3bdd1 15/11/18 13:20:46 INFO MemoryStore: MemoryStore started with capacity 947.7 MB 15/11/18 13:20:46 INFO HttpFileServer: HTTP File server directory is /tmp/spark-736f4b9c-764c-4b85-9b37-1cece102c95a/httpd-29196fa0-eb3f-4b7d-97ad-35c5325b09e5 15/11/18 13:20:46 INFO HttpServer: Starting HTTP Server 15/11/18 13:20:46 INFO Utils: Successfully started service 'HTTP file server' on port 37150. 15/11/18 13:20:46 INFO SparkEnv: Registering OutputCommitCoordinator 15/11/18 13:20:52 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/11/18 13:20:52 INFO SparkUI: Started SparkUI at http://192.168.1.3:4040 15/11/18 13:20:52 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. 15/11/18 13:20:52 INFO Executor: Starting executor ID driver on host localhost 15/11/18 13:20:52 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 47306. 15/11/18 13:20:52 INFO NettyBlockTransferService: Server created on 47306 15/11/18 13:20:52 INFO BlockManagerMaster: Trying to register BlockManager 15/11/18 13:20:52 INFO BlockManagerMasterEndpoint: Registering block manager localhost:47306 with 947.7 MB RAM, BlockManagerId(driver, localhost, 47306) 15/11/18 13:20:52 INFO BlockManagerMaster: Registered BlockManager Trying to connect to RabbitMq 15/11/18 13:20:53 INFO ReceiverTracker: Starting 1 receivers 15/11/18 13:20:53 INFO ReceiverTracker: ReceiverTracker started 15/11/18 13:20:53 INFO ForEachDStream: metadataCleanupDelay = -1 15/11/18 13:20:53 INFO RabbitMQInputDStream: metadataCleanupDelay = -1 15/11/18 13:20:53 INFO RabbitMQInputDStream: Slide time = 2000 ms 15/11/18 13:20:53 INFO RabbitMQInputDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/11/18 13:20:53 INFO RabbitMQInputDStream: Checkpoint interval = null 15/11/18 13:20:53 INFO RabbitMQInputDStream: Remember duration = 2000 ms 15/11/18 13:20:53 INFO RabbitMQInputDStream: Initialized and validated com.stratio.receiver.RabbitMQInputDStream@5d00adc2 15/11/18 13:20:53 INFO ForEachDStream: Slide time = 2000 ms 15/11/18 13:20:53 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/11/18 13:20:53 INFO ForEachDStream: Checkpoint interval = null 15/11/18 13:20:53 INFO ForEachDStream: Remember duration = 2000 ms 15/11/18 13:20:53 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@4c132773 15/11/18 13:20:53 INFO RecurringTimer: Started timer for JobGenerator at time 1447833054000 15/11/18 13:20:53 INFO JobGenerator: Started JobGenerator at 1447833054000 ms 15/11/18 13:20:53 INFO JobScheduler: Started JobScheduler 15/11/18 13:20:53 INFO StreamingContext: StreamingContext started 15/11/18 13:20:53 INFO DAGScheduler: Got job 0 (start at RabbitMqEx.java:38) with 1 output partitions 15/11/18 13:20:53 INFO DAGScheduler: Final stage: ResultStage 0(start at RabbitMqEx.java:38) 15/11/18 13:20:53 INFO ReceiverTracker: Receiver 0 started 15/11/18 13:20:53 INFO DAGScheduler: Parents of final stage: List() 15/11/18 13:20:53 INFO DAGScheduler: Missing parents: List() 15/11/18 13:20:53 INFO DAGScheduler: Submitting ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:556), which has no missing parents 15/11/18 13:20:53 INFO MemoryStore: ensureFreeSpace(46496) called with curMem=0, maxMem=993735475 15/11/18 13:20:53 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 45.4 KB, free 947.7 MB) 15/11/18 13:20:53 INFO MemoryStore: ensureFreeSpace(15206) called with curMem=46496, maxMem=993735475 15/11/18 13:20:53 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 14.8 KB, free 947.6 MB) 15/11/18 13:20:53 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:47306 (size: 14.8 KB, free: 947.7 MB) 15/11/18 13:20:53 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:861 15/11/18 13:20:53 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:556) 15/11/18 13:20:53 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 15/11/18 13:20:53 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, NODE_LOCAL, 2729 bytes) 15/11/18 13:20:53 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 15/11/18 13:20:53 INFO RecurringTimer: Started timer for BlockGenerator at time 1447833053800 15/11/18 13:20:53 INFO BlockGenerator: Started BlockGenerator 15/11/18 13:20:53 INFO BlockGenerator: Started block pushing thread 15/11/18 13:20:53 INFO ReceiverTracker: Registered receiver for stream 0 from 192.168.1.3:42978 15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Starting receiver 15/11/18 13:20:53 INFO RabbitMQReceiver: Rabbit host addresses are :localhost 15/11/18 13:20:53 INFO RabbitMQReceiver: Address localhost 15/11/18 13:20:53 INFO RabbitMQReceiver: creating new connection and channel 15/11/18 13:20:53 INFO RabbitMQReceiver: No virtual host configured 15/11/18 13:20:53 INFO RabbitMQReceiver: created new connection and channel 15/11/18 13:20:53 INFO RabbitMQReceiver: onStart, Connecting.. 15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Called receiver onStart 15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Waiting for receiver to be stopped 15/11/18 13:20:53 INFO RabbitMQReceiver: declaring direct queue 15/11/18 13:20:53 ERROR RabbitMQReceiver: Got this unknown exception: java.io.IOException java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:844) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at com.stratio.receiver.RabbitMQReceiver.getQueueName(RabbitMQInputDStream.scala:126) at com.stratio.receiver.RabbitMQReceiver.com$stratio$receiver$RabbitMQReceiver$$receive(RabbitMQInputDStream.scala:86) at com.stratio.receiver.RabbitMQReceiver$$anon$1.run(RabbitMQInputDStream.scala:69) Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello1' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 5 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello1' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:554) at java.lang.Thread.run(Thread.java:745) 15/11/18 13:20:53 INFO RabbitMQReceiver: it has been stopped 15/11/18 13:20:53 ERROR RabbitMQReceiver: error on close channel, ignoring 15/11/18 13:20:53 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Trying to connect again 15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Trying to connect again: 15/11/18 13:20:53 INFO RabbitMQReceiver: onStop, doing nothing.. relaxing... 15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Called receiver onStop 15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Deregistering receiver 0 15/11/18 13:20:53 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Trying to connect again 15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Stopped receiver 0 15/11/18 13:20:54 INFO JobScheduler: Added jobs for time 1447833054000 ms 15/11/18 13:20:54 INFO JobScheduler: Starting job streaming job 1447833054000 ms.0 from job set of time 1447833054000 ms Value Received BlockRDD[1] at ReceiverInputDStream at RabbitMQInputDStream.scala:33 15/11/18 13:20:54 INFO JobScheduler: Finished job streaming job 1447833054000 ms.0 from job set of time 1447833054000 ms 15/11/18 13:20:54 INFO JobScheduler: Total delay: 0.031 s for time 1447833054000 ms (execution: 0.007 s) 15/11/18 13:20:54 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer() 15/11/18 13:20:54 INFO InputInfoTracker: remove old batch metadata: 15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Starting receiver again 15/11/18 13:20:55 INFO ReceiverTracker: Registered receiver for stream 0 from 192.168.1.3:42978 15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Starting receiver 15/11/18 13:20:55 INFO RabbitMQReceiver: Rabbit host addresses are :localhost 15/11/18 13:20:55 INFO RabbitMQReceiver: Address localhost 15/11/18 13:20:55 INFO RabbitMQReceiver: creating new connection and channel 15/11/18 13:20:55 INFO RabbitMQReceiver: No virtual host configured 15/11/18 13:20:55 INFO RabbitMQReceiver: created new connection and channel 15/11/18 13:20:55 INFO RabbitMQReceiver: onStart, Connecting.. 15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Called receiver onStart 15/11/18 13:20:55 INFO RabbitMQReceiver: declaring direct queue 15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Receiver started again 15/11/18 13:20:55 ERROR RabbitMQReceiver: Got this unknown exception: java.io.IOException java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:844) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at com.stratio.receiver.RabbitMQReceiver.getQueueName(RabbitMQInputDStream.scala:126) at com.stratio.receiver.RabbitMQReceiver.com$stratio$receiver$RabbitMQReceiver$$receive(RabbitMQInputDStream.scala:86) at com.stratio.receiver.RabbitMQReceiver$$anon$1.run(RabbitMQInputDStream.scala:69) Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello1' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 5 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello1' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:554) at java.lang.Thread.run(Thread.java:745) 15/11/18 13:20:55 INFO RabbitMQReceiver: it has been stopped 15/11/18 13:20:55 ERROR RabbitMQReceiver: error on close channel, ignoring 15/11/18 13:20:55 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Trying to connect again 15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Trying to connect again: 15/11/18 13:20:55 INFO RabbitMQReceiver: onStop, doing nothing.. relaxing... 15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Called receiver onStop 15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Deregistering receiver 0 15/11/18 13:20:55 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Trying to connect again 15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Stopped receiver 0 15/11/18 13:20:56 INFO JobScheduler: Added jobs for time 1447833056000 ms 15/11/18 13:20:56 INFO JobScheduler: Starting job streaming job 1447833056000 ms.0 from job set of time 1447833056000 ms |

Doing |list_queues| list the following:-
||

|sudo rabbitmqctl list_queues Listing queues ... hello1 2 |

I also printed the value of |arg0.count|. It is reporting 0. It seems spark streaming is not able to read messages from rabbitmq.

However I can read from the queue using a simple java receiver as mentioned here <https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Recv.java>.

Environment

 * RabbitMq Version - 3.5.6
 * Spark 1.5.2
 * Java 8 (Update 66)

Can some one let me know what is going wrong and how can I read message from RabbitMq via Spark Streaming.

Thanks,
D







Reply via email to