Hi Todd, If I use the FileSystem API and I am on local node - how do I get/read just that particular block residing locally on that node ? Do I just open the FSDataInputStream and seek() to the position where the block starts ?
Thanks, David. On 10.1.12 7:19 PM, "Todd Lipcon" <[email protected]> wrote: >On Tue, Jan 10, 2012 at 4:32 AM, David Pavlis <[email protected]> >wrote: >> Hi Todd, >> >> Understand, I will re-think the MR approach. Nonetheless I like the idea >> with getting the block id and accessing the locally stored file directly > >You really don't want to do this - what happens when the balancer >moves the block from under you? > >> Is there a way (public interface) to find for particular datanode, where >> the local storage is rooted (which local subdirs it uses) ? >> I found that this info is available in Storage class, but is there a >> "public" way of getting it - through some protocol or so ? > >No, it's also a private API. If you use the FileSystem API, and you're >on the local node, all of these optimizations will happen >automatically for you, plus they'll keep working when you upgrade. >Even MapReduce just uses the public APIs. > >-Todd > >> >> >> On 9.1.12 9:30 PM, "Todd Lipcon" <[email protected]> wrote: >> >>>Hi David, >>> >>>I'd definitely recommend using MapReduce for this. What you've >>>described is essentially identical to MR. >>> >>>Otherwise, you should use the public API >>>FileSystem.getFileBlockLocations(), and then read the host names out >>>of the returned BlockLocation struct. Then just use a normal >>>FileSystem open call from that node - it will automatically pick the >>>local replica for you without any further work. >>> >>>-Todd >>> >>>On Mon, Jan 9, 2012 at 12:01 PM, David Pavlis <[email protected]> >>>wrote: >>>> Hi Todd, >>>> >>>> Thanks for letting me know. OK - here is what I am trying to do (it >>>>is >>>>a >>>> POC for now): >>>> >>>> We have an ETL framework which helps with transforming data - parsing >>>> various formats, reading from DBs, >>>> aggregating, sorting, etc.. >>>> We do have currently concept of a "cluster" which basically allows >>>>input >>>> data (say datafile) be split/partitioned >>>> across several nodes and then one data transformation is executed on >>>>those >>>> data. The way it works is that >>>> we can analyze what the transformation does and if it is supposed to >>>> consume data which is spread over cluster, we >>>> execute a slightly modified copy/instance of that transformation on >>>>each >>>> node of the cluster where some piece/partition >>>> of data resides. We do not have concept of any "clustered" filesystem >>>>- >>>> our partitioned data reside in ordinary files >>>> and there is no metadata layer on top. If we need one single output >>>>data >>>> file, then we just perform merge operation. It is >>>> quite limiting as if we need to manipulate such data, we need to do it >>>> piece by piece (on each participating node). >>>> >>>> So we essentially do split-transform-merge with merge being optional >>>>(can >>>> be part of the transformation directly, so we don't >>>> create temp files). >>>> >>>> Here is the idea with HDFS - each of our transformation node becomes >>>> HDFS's datanode. Then if we are to process particular >>>> input data split over several datanodes, then we just instruct our >>>> transformation nodes to read specific block of the file (block/s >>>> which happens to be on the same physical machine as our transformation >>>> node is also datanode) -hence my interest in BlockReader. >>>> >>>> I was also considering wrapping our transformation job into map-reduce >>>>job >>>> of Hadoop, but that seems a bit limiting and also >>>> we would need to "take" the whole Hadoop stack and give it control >>>>over >>>> our jobs. But that still might be the right way. >>>> Also, I need to solve writing of partitioned data - here I would like >>>>to >>>> control the block allocation somehow as ideally transformation >>>> running on particular node would be reading locally stored blocks and >>>> outputting data to locally allocated block of HDFS file. >>>> >>>> Well, I hope I explained the situation clearly enough. >>>> >>>> I will be thankful for any comments. >>>> >>>> Regards, >>>> >>>> David. >>>> >>>> >>>> >>>> >>>> On 9.1.12 6:59 PM, "Todd Lipcon" <[email protected]> wrote: >>>> >>>>>Hi David, >>>>> >>>>>For what it's worth, you should be aware that you're calling internal >>>>>APIs that have no guarantee of stability between versions. I can >>>>>practically guarantee that your code will have to be modified for any >>>>>HDFS upgrade you do. That's why these APIs are undocumented. >>>>> >>>>>Perhaps you can explain what your high-level goal is, here, and we can >>>>>suggest a supported mechanism for achieving it. >>>>> >>>>>-Todd >>>>> >>>>>On Mon, Jan 9, 2012 at 9:56 AM, David Pavlis <[email protected]> >>>>>wrote: >>>>>> Hi Denny, >>>>>> >>>>>> Thanks a lot. I was able to make my code work. >>>>>> >>>>>> I am posting a small example below - in case somebody in the future >>>>>>has >>>>>> similar need ;-) >>>>>> (not handling replica datablocks). >>>>>> >>>>>> David. >>>>>> >>>>>> >>>>>>********************************************************************* >>>>>>** >>>>>>** >>>>>>** >>>>>> public static void main(String args[]){ >>>>>> String >>>>>>filename="/user/hive/warehouse/sample_07/sample_07.csv"; >>>>>> int DATANODE_PORT = 50010; >>>>>> int NAMENODE_PORT = 8020; >>>>>> String HOST_IP = "192.168.1.230"; >>>>>> >>>>>> byte[] buf=new byte[1000]; >>>>>> >>>>>> >>>>>> try{ >>>>>> >>>>>> ClientProtocol client= DFSClient.createNamenode(new >>>>>> InetSocketAddress(HOST_IP,NAMENODE_PORT), new Configuration()); >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> LocatedBlocks located= >>>>>>client.getBlockLocations(filename, 0, >>>>>> Long.MAX_VALUE); >>>>>> >>>>>> >>>>>> >>>>>> for(LocatedBlock block : located.getLocatedBlocks()){ >>>>>> Socket sock = >>>>>>SocketFactory.getDefault().createSocket(); >>>>>> InetSocketAddress targetAddr = new >>>>>> InetSocketAddress(HOST_IP,DATANODE_PORT); >>>>>> NetUtils.connect(sock, targetAddr, 10000); >>>>>> sock.setSoTimeout(10000); >>>>>> >>>>>> >>>>>> BlockReader >>>>>>reader=BlockReader.newBlockReader(sock, filename, >>>>>> block.getBlock().getBlockId(), >>>>>>block.getBlockToken(), >>>>>> block.getBlock().getGenerationStamp(), 0, >>>>>> block.getBlockSize(), >>>>>> 1000); >>>>>> >>>>>> >>>>>> int count=0; >>>>>> int length; >>>>>> while((length=reader.read(buf,0,1000))>0){ >>>>>> //System.out.print(new >>>>>>String(buf,0,length,"UTF-8")); >>>>>> if (length<1000) break; >>>>>> } >>>>>> reader.close(); >>>>>> sock.close(); >>>>>> } >>>>>> >>>>>> >>>>>> }catch(IOException ex){ >>>>>> ex.printStackTrace(); >>>>>> } >>>>>> >>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>>********************************************************************* >>>>>>** >>>>>>** >>>>>>** >>>>>> >>>>>> >>>>>> >>>>>> From: Denny Ye <[email protected]> >>>>>> Reply-To: <[email protected]> >>>>>> Date: Mon, 9 Jan 2012 16:29:18 +0800 >>>>>> To: <[email protected]> >>>>>> Subject: Re: How-to use DFSClient's BlockReader from Java >>>>>> >>>>>> >>>>>> hi David Please refer to the method >>>>>>"DFSInputStream#blockSeekTo", >>>>>>it >>>>>> has same purpose with you. >>>>>> >>>>>> >>>>>>********************************************************************* >>>>>>** >>>>>>** >>>>>>** >>>>>> LocatedBlock targetBlock = getBlockAt(target, true); >>>>>> assert (target==this.pos) : "Wrong postion " + pos + " expect >>>>>>" + >>>>>> target; >>>>>> long offsetIntoBlock = target - targetBlock.getStartOffset(); >>>>>> >>>>>> DNAddrPair retval = chooseDataNode(targetBlock); >>>>>> chosenNode = retval.info <http://retval.info>; >>>>>> InetSocketAddress targetAddr = retval.addr; >>>>>> >>>>>> try { >>>>>> s = socketFactory.createSocket(); >>>>>> NetUtils.connect(s, targetAddr, socketTimeout); >>>>>> s.setSoTimeout(socketTimeout); >>>>>> Block blk = targetBlock.getBlock(); >>>>>> Token<BlockTokenIdentifier> accessToken = >>>>>> targetBlock.getBlockToken(); >>>>>> >>>>>> blockReader = BlockReader.newBlockReader(s, src, >>>>>> blk.getBlockId(), >>>>>> accessToken, >>>>>> blk.getGenerationStamp(), >>>>>> offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock, >>>>>> buffersize, verifyChecksum, clientName); >>>>>> >>>>>> >>>>>> >>>>>>********************************************************************* >>>>>>** >>>>>>** >>>>>>** >>>>>> >>>>>> >>>>>> -Regards >>>>>> Denny Ye >>>>>> >>>>>> 2012/1/6 David Pavlis <[email protected]> >>>>>> >>>>>> Hi, >>>>>> >>>>>> I am relatively new to Hadoop and I am trying to utilize HDFS for >>>>>>own >>>>>> application where I want to take advantage of data partitioning HDFS >>>>>> performs. >>>>>> >>>>>> The idea is that I get list of individual blocks - BlockLocations of >>>>>> particular file and then directly read those (go to individual >>>>>>DataNodes). >>>>>> So far I found org.apache.hadoop.hdfs.DFSClient.BlockReader to be >>>>>>the >>>>>>way >>>>>> to go. >>>>>> >>>>>> However I am struggling with instantiating the BlockReader() class, >>>>>>namely >>>>>> creating the "Token<BlockTokenIdentifier>". >>>>>> >>>>>> Is there an example Java code showing how to access individual >>>>>>blocks >>>>>>of >>>>>> particular file stored on HDFS ? >>>>>> >>>>>> Thanks in advance, >>>>>> >>>>>> David. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> >>>>>-- >>>>>Todd Lipcon >>>>>Software Engineer, Cloudera >>>> >>>> >>> >>> >>> >>>-- >>>Todd Lipcon >>>Software Engineer, Cloudera >> >> > > > >-- >Todd Lipcon >Software Engineer, Cloudera
