Hi Anchit, 
cat you create more than one data in each dataset to test again?



> On Sep 26, 2015, at 18:00, Fengdong Yu <fengdo...@everstring.com> wrote:
> 
> Anchit,
> 
> please ignore my inputs. you are right. Thanks.
> 
> 
> 
>> On Sep 26, 2015, at 17:27, Fengdong Yu <fengdo...@everstring.com 
>> <mailto:fengdo...@everstring.com>> wrote:
>> 
>> Hi Anchit,
>> 
>> this is not my expected, because you specified the HDFS directory in your 
>> code.
>> I've solved like this:
>> 
>>        val text = sc.hadoopFile(Args.input,
>>                    classOf[TextInputFormat], classOf[LongWritable], 
>> classOf[Text], 2)
>>         val hadoopRdd = text.asInstanceOf[HadoopRDD[LongWritable, Text]]
>> 
>>           hadoopRdd.mapPartitionsWithInputSplit((inputSplit, iterator) => {
>>               val file = inputSplit.asInstanceOf[FileSplit]
>>               terator.map ( tp => {tp._1, new Text(file.toString + “,” + 
>> tp._2.toString)})
>>           }
>> 
>> 
>> 
>> 
>>> On Sep 25, 2015, at 13:12, Anchit Choudhry <anchit.choud...@gmail.com 
>>> <mailto:anchit.choud...@gmail.com>> wrote:
>>> 
>>> Hi Fengdong,
>>> 
>>> So I created two files in HDFS under a test folder.
>>> 
>>> test/dt=20100101.json
>>> { "key1" : "value1" }
>>> 
>>> test/dt=20100102.json
>>> { "key2" : "value2" }
>>> 
>>> Then inside PySpark shell
>>> 
>>> rdd = sc.wholeTextFiles('./test/*')
>>> rdd.collect()
>>> [(u'hdfs://localhost:9000/user/hduser/test/dt=20100101.json', u'{ "key1" : 
>>> "value1" }), (u'hdfs://localhost:9000/user/hduser/test/dt=20100102.json', 
>>> u'{ "key2" : "value2" })]
>>> import json
>>> def editMe(y, x):
>>>       j = json.loads(y)
>>>       j['source'] = x
>>>       return j
>>> 
>>> rdd.map(lambda (x,y): editMe(y,x)).collect()
>>> [{'source': u'hdfs://localhost:9000/user/hduser/test/dt=20100101.json', 
>>> u'key1': u'value1'}, {u'key2': u'value2', 'source': 
>>> u'hdfs://localhost:9000/user/hduser/test/dt=20100102.json'}]
>>> 
>>> Similarly you could modify the function to return 'source' and 'date' with 
>>> some string manipulation per your requirements.
>>> 
>>> Let me know if this helps.
>>> 
>>> Thanks,
>>> Anchit
>>> 
>>> 
>>> On 24 September 2015 at 23:55, Fengdong Yu <fengdo...@everstring.com 
>>> <mailto:fengdo...@everstring.com>> wrote:
>>> 
>>> yes. such as I have two data sets:
>>> 
>>> date set A: /data/test1/dt=20100101
>>> data set B: /data/test2/dt=20100202
>>> 
>>> 
>>> all data has the same JSON format , such as:
>>> {“key1” : “value1”, “key2” : “value2” }
>>> 
>>> 
>>> my output expected:
>>> {“key1” : “value1”, “key2” : “value2” , “source” : “test1”, “date” : 
>>> “20100101"}
>>> {“key1” : “value1”, “key2” : “value2” , “source” : “test2”, “date” : 
>>> “20100202"}
>>> 
>>> 
>>>> On Sep 25, 2015, at 11:52, Anchit Choudhry <anchit.choud...@gmail.com 
>>>> <mailto:anchit.choud...@gmail.com>> wrote:
>>>> 
>>>> Sure. May I ask for a sample input(could be just few lines) and the output 
>>>> you are expecting to bring clarity to my thoughts?
>>>> 
>>>> On Thu, Sep 24, 2015, 23:44 Fengdong Yu <fengdo...@everstring.com 
>>>> <mailto:fengdo...@everstring.com>> wrote:
>>>> Hi Anchit, 
>>>> 
>>>> Thanks for the quick answer.
>>>> 
>>>> my exact question is : I want to add HDFS location into each line in my 
>>>> JSON  data.
>>>> 
>>>> 
>>>> 
>>>>> On Sep 25, 2015, at 11:25, Anchit Choudhry <anchit.choud...@gmail.com 
>>>>> <mailto:anchit.choud...@gmail.com>> wrote:
>>>>> 
>>>>> Hi Fengdong,
>>>>> 
>>>>> Thanks for your question.
>>>>> 
>>>>> Spark already has a function called wholeTextFiles within sparkContext 
>>>>> which can help you with that:
>>>>> 
>>>>> Python
>>>>> hdfs://a-hdfs-path/part-00000
>>>>> hdfs://a-hdfs-path/part-00001
>>>>> ...
>>>>> hdfs://a-hdfs-path/part-nnnnn
>>>>> rdd = sparkContext.wholeTextFiles(“hdfs://a- <>hdfs-path”)
>>>>> (a-hdfs-path/part-00000, its content)
>>>>> (a-hdfs-path/part-00001, its content)
>>>>> ...
>>>>> (a-hdfs-path/part-nnnnn, its content)
>>>>> More info: http://spark 
>>>>> <http://spark/>.apache.org/docs/latest/api/python/pyspark.html?highlight=wholetext#pyspark.SparkContext.wholeTextFiles
>>>>> 
>>>>> ------------
>>>>> 
>>>>> Scala
>>>>> 
>>>>> val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")
>>>>> 
>>>>> More info: 
>>>>> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext@wholeTextFiles(String,Int):RDD[(String,String)]
>>>>>  
>>>>> Let us know if this helps or you need more help.
>>>>> 
>>>>> Thanks,
>>>>> Anchit Choudhry
>>>>> 
>>>>> On 24 September 2015 at 23:12, Fengdong Yu <fengdo...@everstring.com 
>>>>> <mailto:fengdo...@everstring.com>> wrote:
>>>>> Hi,
>>>>> 
>>>>> I have  multiple files with JSON format, such as:
>>>>> 
>>>>> /data/test1_data/sub100/test.data
>>>>> /data/test2_data/sub200/test.data
>>>>> 
>>>>> 
>>>>> I can sc.textFile(“/data/*/*”)
>>>>> 
>>>>> but I want to add the {“source” : “HDFS_LOCATION”} to each line, then 
>>>>> save it the one target HDFS location.
>>>>> 
>>>>> how to do it, Thanks.
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org 
>>>>> <mailto:dev-unsubscr...@spark.apache.org>
>>>>> For additional commands, e-mail: dev-h...@spark.apache.org 
>>>>> <mailto:dev-h...@spark.apache.org>
>>>>> 
>>>>> 
>>>> 
>>> 
>>> 
>> 
> 

Reply via email to