Hi, everyone
I come across with a prolem about writing data to mongodb in mapPartitions,
my code is as below: val sourceRDD =
sc.textFile("hdfs://host:port/sourcePath") // some transformations
val rdd= sourceRDD .map(mapFunc).filter(filterFunc) val newRDD =
rdd.mapPartitions(args => { val mongoClient = new
MongoClient("host", port)
val db = mongoClient.getDB("db")
val coll = db.getCollection("collectionA")
args.map(arg => {
coll.insert(new BasicDBObject("pkg", arg))
arg
})
mongoClient.close()
args
}) newRDD.saveAsTextFile("hdfs://host:port/path") The
application saved data to HDFS correctly, but not mongodb, is there someting
wrong? I know that collecting the newRDD to driver and then saving it to
mongodb will success, but will the following saveAsTextFile read the filesystem
once again?
Thanks
qinwei