You are correct in that I am trying to publish inside of a foreachRDD loop. 
I am currently refactoring and will try publishing inside the
foreachPartition loop.  Below is the code showing the way it is currently
written, thanks!


object myData {
  def main(args: Array[String]) {

    val ssc = new StreamingContext("local[8]", "Data", Seconds(10))
    ssc.checkpoint("checkpoint")
    val topicMap = Map("pagehit.data" -> 1)

    val factory = new ConnectionFactory()
    factory.setUsername("officialUsername")
    factory.setPassword("crypticPassword")
    factory.setVirtualHost("/")
    factory.setHost("rabbit-env")
    factory.setPort(0000)
    val connection = factory.newConnection()

    val SQLChannel = connection.createChannel()
    SQLChannel.queueDeclare("SQLQueue", true, false, false, null) 

    val Pipe = KafkaUtils.createStream(ssc,
"Zookeeper_1,Zookeeper_1,Zookeeper_3", "Cons1",          
topicMap).map(_._2)

//PARSE SOME JSON ETC

      windowStream.foreachRDD(pagehit => {
      val mongoClient = MongoClient("my-mongodb")
      val db = mongoClient("myClient")
      val SQLCollection = db("SQLCalls")
      
      val callArray = pagehit.map(_._1).collect
      val avg = (callArray.reduceLeft[Long](_+_))/callArray.length
      val URL = pagehit.take(1).map(_._2)
    
      SQLCollection += MongoDBObject("URL" -> URL(0).substring(7,
URL(0).length - 1),
                                                                 "Avg Page Load 
Time" -> avg)

      val toBuildJSON = Seq(baseMsg, avg.toString, closingBrace)
      val byteArray = toBuildJSON.mkString.getBytes()

      SQLChannel.basicPublish("", "SQLQueue", null, byteArray)

    })



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-RabbitMQ-tp11283p11445.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to