Well, we are using it store large binary files. I get the distributing the processing will allow faster times when doing multiple read/writes, but didn't realize that the map and reduce would help when writing one file. Ananth T Sarathy
On Wed, Aug 19, 2009 at 12:14 PM, Edward Capriolo <[email protected]>wrote: > Ananth, > > That is your issue really. > > For example. I have 20 web servers and I wish to download all the > weblogs from all of them into hadoop. > > If you write a top down program that uses FSDataOutput. You are using > hadoop half way. You are using the distributed file system, but you > are not doing any distributed processing. > > Better is to specify all the servers/files you with to download as > your input file. Tell hadoop to use NLineInput format. Move your code > inside a map function. Now since hadoop ran run multiple mappers > using -Dmapred.map.tasks=6 will cause 6 fetchers to run in parallel. > You can up this as high as you are comfortable with. > > Also now that you are using m/r you don't have to write files with > FSDataOuputStream , you can use output.collect() to make a sequence > file. > > In my case I am using commons-FTP and FSDataOutputStream (not using > output.collect() ) as I do not want a big sequence file I want the > actual files as they exist on the web server I will merge them down > the line in my process. This works very well. I could turn the number > of mappers higher, but I don't want to beat up my web servers and > network anymore. (hint: turn off speculative execution) > > Now you know all my secrets. Good luck :) > > > On Wed, Aug 19, 2009 at 11:45 AM, Ananth T. > Sarathy<[email protected]> wrote: > > Right now just in top down program. I am still learning this, so I need > put > > this in a map and reduce to optimize speed I will. Right now I am just > > testing certain things, and getting a skeleton to write and pull files > from > > the s3 storage. Actual implementation is still being engineered. > > > > > > Ananth T Sarathy > > > > > > On Wed, Aug 19, 2009 at 11:11 AM, Edward Capriolo <[email protected] > >wrote: > > > >> >>It would be as fast as underlying filesystem goes. > >> I would not agree with that statement. There is overhead. If you have > >> a single threaded process writing many small files you do not get the > >> parallel write speed. In some testing I did writing a small file can > >> take 30-300 ms. So if you have 9000 small files (like I did) and you > >> are single threaded this takes a long time. > >> > >> If you orchestrate your task to use FSDataInput and FSDataOutput in > >> the map or reduce phase then each mapper or reducer is writing a file > >> at a time. Now that is fast. > >> > >> Ananth, are you doing your r/w inside a map/reduce job or are you just > >> using FS* in a top down program? > >> > >> > >> > >> On Wed, Aug 19, 2009 at 1:26 AM, Raghu Angadi<[email protected]> > >> wrote: > >> > Ananth T. Sarathy wrote: > >> >> > >> >> I am trying to download binary files stored in Hadoop but there is > like > >> a > >> >> 2 > >> >> minute wait on a 20mb file when I try to execute the in.read(buf). > >> > > >> > What does this mean : 2 min to pipe 20mb or one or your one of the > >> in.read() > >> > calls took 2 minutes? Your code actually measures team for read and > >> write. > >> > > >> > There is nothing in FSInputstream to cause this slow down. Do you > think > >> > anyone would use Hadoop otherwise? It would be as fast as underlying > >> > filesystem goes. > >> > > >> > Raghu. > >> > > >> >> is there a better way to be doing this? > >> >> > >> >> private void pipe(InputStream in, OutputStream out) throws > >> IOException > >> >> { System.out.println(System.currentTimeMillis()+" Starting to > Pipe > >> >> Data"); > >> >> byte[] buf = new byte[1024]; > >> >> int read = 0; > >> >> while ((read = in.read(buf)) >= 0) > >> >> { > >> >> out.write(buf, 0, read); > >> >> System.out.println(System.currentTimeMillis()+" Piping > >> Data"); > >> >> } > >> >> out.flush(); > >> >> System.out.println(System.currentTimeMillis()+" Finished > Piping > >> >> Data"); > >> >> > >> >> } > >> >> > >> >> public void readFile(String fileToRead, OutputStream out) > >> >> throws IOException > >> >> { > >> >> System.out.println(System.currentTimeMillis()+" Start Read > >> File"); > >> >> Path inFile = new Path(fileToRead); > >> >> System.out.println(System.currentTimeMillis()+" Set Path"); > >> >> // Validate the input/output paths before reading/writing. > >> >> > >> >> if (!fs.exists(inFile)) > >> >> { > >> >> throw new HadoopFileException("Specified file " + > fileToRead > >> >> + " not found."); > >> >> } > >> >> if (!fs.isFile(inFile)) > >> >> { > >> >> throw new HadoopFileException("Specified file " + > fileToRead > >> >> + " not found."); > >> >> } > >> >> // Open inFile for reading. > >> >> System.out.println(System.currentTimeMillis()+" Opening Data > >> >> Stream"); > >> >> FSDataInputStream in = fs.open(inFile); > >> >> > >> >> System.out.println(System.currentTimeMillis()+" Opened Data > >> >> Stream"); > >> >> // Open outFile for writing. > >> >> > >> >> // Read from input stream and write to output stream until > EOF. > >> >> pipe(in, out); > >> >> > >> >> // Close the streams when done. > >> >> out.close(); > >> >> in.close(); > >> >> } > >> >> Ananth T Sarathy > >> >> > >> > > >> > > >> > > >
