You are correct in that I am trying to publish inside of a foreachRDD loop. I am currently refactoring and will try publishing inside the foreachPartition loop. Below is the code showing the way it is currently written, thanks!
object myData { def main(args: Array[String]) { val ssc = new StreamingContext("local[8]", "Data", Seconds(10)) ssc.checkpoint("checkpoint") val topicMap = Map("pagehit.data" -> 1) val factory = new ConnectionFactory() factory.setUsername("officialUsername") factory.setPassword("crypticPassword") factory.setVirtualHost("/") factory.setHost("rabbit-env") factory.setPort(0000) val connection = factory.newConnection() val SQLChannel = connection.createChannel() SQLChannel.queueDeclare("SQLQueue", true, false, false, null) val Pipe = KafkaUtils.createStream(ssc, "Zookeeper_1,Zookeeper_1,Zookeeper_3", "Cons1", topicMap).map(_._2) //PARSE SOME JSON ETC windowStream.foreachRDD(pagehit => { val mongoClient = MongoClient("my-mongodb") val db = mongoClient("myClient") val SQLCollection = db("SQLCalls") val callArray = pagehit.map(_._1).collect val avg = (callArray.reduceLeft[Long](_+_))/callArray.length val URL = pagehit.take(1).map(_._2) SQLCollection += MongoDBObject("URL" -> URL(0).substring(7, URL(0).length - 1), "Avg Page Load Time" -> avg) val toBuildJSON = Seq(baseMsg, avg.toString, closingBrace) val byteArray = toBuildJSON.mkString.getBytes() SQLChannel.basicPublish("", "SQLQueue", null, byteArray) }) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-RabbitMQ-tp11283p11445.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org