Hi Anchit, cat you create more than one data in each dataset to test again?
> On Sep 26, 2015, at 18:00, Fengdong Yu <fengdo...@everstring.com> wrote: > > Anchit, > > please ignore my inputs. you are right. Thanks. > > > >> On Sep 26, 2015, at 17:27, Fengdong Yu <fengdo...@everstring.com >> <mailto:fengdo...@everstring.com>> wrote: >> >> Hi Anchit, >> >> this is not my expected, because you specified the HDFS directory in your >> code. >> I've solved like this: >> >> val text = sc.hadoopFile(Args.input, >> classOf[TextInputFormat], classOf[LongWritable], >> classOf[Text], 2) >> val hadoopRdd = text.asInstanceOf[HadoopRDD[LongWritable, Text]] >> >> hadoopRdd.mapPartitionsWithInputSplit((inputSplit, iterator) => { >> val file = inputSplit.asInstanceOf[FileSplit] >> terator.map ( tp => {tp._1, new Text(file.toString + “,” + >> tp._2.toString)}) >> } >> >> >> >> >>> On Sep 25, 2015, at 13:12, Anchit Choudhry <anchit.choud...@gmail.com >>> <mailto:anchit.choud...@gmail.com>> wrote: >>> >>> Hi Fengdong, >>> >>> So I created two files in HDFS under a test folder. >>> >>> test/dt=20100101.json >>> { "key1" : "value1" } >>> >>> test/dt=20100102.json >>> { "key2" : "value2" } >>> >>> Then inside PySpark shell >>> >>> rdd = sc.wholeTextFiles('./test/*') >>> rdd.collect() >>> [(u'hdfs://localhost:9000/user/hduser/test/dt=20100101.json', u'{ "key1" : >>> "value1" }), (u'hdfs://localhost:9000/user/hduser/test/dt=20100102.json', >>> u'{ "key2" : "value2" })] >>> import json >>> def editMe(y, x): >>> j = json.loads(y) >>> j['source'] = x >>> return j >>> >>> rdd.map(lambda (x,y): editMe(y,x)).collect() >>> [{'source': u'hdfs://localhost:9000/user/hduser/test/dt=20100101.json', >>> u'key1': u'value1'}, {u'key2': u'value2', 'source': >>> u'hdfs://localhost:9000/user/hduser/test/dt=20100102.json'}] >>> >>> Similarly you could modify the function to return 'source' and 'date' with >>> some string manipulation per your requirements. >>> >>> Let me know if this helps. >>> >>> Thanks, >>> Anchit >>> >>> >>> On 24 September 2015 at 23:55, Fengdong Yu <fengdo...@everstring.com >>> <mailto:fengdo...@everstring.com>> wrote: >>> >>> yes. such as I have two data sets: >>> >>> date set A: /data/test1/dt=20100101 >>> data set B: /data/test2/dt=20100202 >>> >>> >>> all data has the same JSON format , such as: >>> {“key1” : “value1”, “key2” : “value2” } >>> >>> >>> my output expected: >>> {“key1” : “value1”, “key2” : “value2” , “source” : “test1”, “date” : >>> “20100101"} >>> {“key1” : “value1”, “key2” : “value2” , “source” : “test2”, “date” : >>> “20100202"} >>> >>> >>>> On Sep 25, 2015, at 11:52, Anchit Choudhry <anchit.choud...@gmail.com >>>> <mailto:anchit.choud...@gmail.com>> wrote: >>>> >>>> Sure. May I ask for a sample input(could be just few lines) and the output >>>> you are expecting to bring clarity to my thoughts? >>>> >>>> On Thu, Sep 24, 2015, 23:44 Fengdong Yu <fengdo...@everstring.com >>>> <mailto:fengdo...@everstring.com>> wrote: >>>> Hi Anchit, >>>> >>>> Thanks for the quick answer. >>>> >>>> my exact question is : I want to add HDFS location into each line in my >>>> JSON data. >>>> >>>> >>>> >>>>> On Sep 25, 2015, at 11:25, Anchit Choudhry <anchit.choud...@gmail.com >>>>> <mailto:anchit.choud...@gmail.com>> wrote: >>>>> >>>>> Hi Fengdong, >>>>> >>>>> Thanks for your question. >>>>> >>>>> Spark already has a function called wholeTextFiles within sparkContext >>>>> which can help you with that: >>>>> >>>>> Python >>>>> hdfs://a-hdfs-path/part-00000 >>>>> hdfs://a-hdfs-path/part-00001 >>>>> ... >>>>> hdfs://a-hdfs-path/part-nnnnn >>>>> rdd = sparkContext.wholeTextFiles(“hdfs://a- <>hdfs-path”) >>>>> (a-hdfs-path/part-00000, its content) >>>>> (a-hdfs-path/part-00001, its content) >>>>> ... >>>>> (a-hdfs-path/part-nnnnn, its content) >>>>> More info: http://spark >>>>> <http://spark/>.apache.org/docs/latest/api/python/pyspark.html?highlight=wholetext#pyspark.SparkContext.wholeTextFiles >>>>> >>>>> ------------ >>>>> >>>>> Scala >>>>> >>>>> val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path") >>>>> >>>>> More info: >>>>> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext@wholeTextFiles(String,Int):RDD[(String,String)] >>>>> >>>>> Let us know if this helps or you need more help. >>>>> >>>>> Thanks, >>>>> Anchit Choudhry >>>>> >>>>> On 24 September 2015 at 23:12, Fengdong Yu <fengdo...@everstring.com >>>>> <mailto:fengdo...@everstring.com>> wrote: >>>>> Hi, >>>>> >>>>> I have multiple files with JSON format, such as: >>>>> >>>>> /data/test1_data/sub100/test.data >>>>> /data/test2_data/sub200/test.data >>>>> >>>>> >>>>> I can sc.textFile(“/data/*/*”) >>>>> >>>>> but I want to add the {“source” : “HDFS_LOCATION”} to each line, then >>>>> save it the one target HDFS location. >>>>> >>>>> how to do it, Thanks. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> --------------------------------------------------------------------- >>>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org >>>>> <mailto:dev-unsubscr...@spark.apache.org> >>>>> For additional commands, e-mail: dev-h...@spark.apache.org >>>>> <mailto:dev-h...@spark.apache.org> >>>>> >>>>> >>>> >>> >>> >> >