[ 
https://issues.apache.org/jira/browse/HDFS-15206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mania Abdi updated HDFS-15206:
------------------------------
    Priority: Minor  (was: Major)

> HDFS synchronous reads from local file system
> ---------------------------------------------
>
>                 Key: HDFS-15206
>                 URL: https://issues.apache.org/jira/browse/HDFS-15206
>             Project: Hadoop HDFS
>          Issue Type: Improvement
>         Environment: !Screenshot from 2020-03-03 22-07-26.png!
>            Reporter: Mania Abdi
>            Priority: Minor
>         Attachments: Screenshot from 2020-03-03 22-07-26.png
>
>
> Hello everyone,
> I ran a simple benchmark with runs ``` hadoop fs -get /file1.txt ```, and 
> file1.txt has 1MB size and I capture the workflow of requests using XTrace. 
> By evaluating the workflow trace, I noticed that datanode issues 64KB 
> synchronous read request to local file system to read the data, and sends the 
> data back and waits for completion. I had a code walk over HDFS code to 
> verify the pattern and it was correct. I want to have two suggestions, (1) 
> since each file in HDFS block size is usually 128MB, We could use the mmap 
> mapping via FileChannel class to load the file into memory and enable file 
> system prefetching and look ahead in background, instead of synchronously 
> reading from disk. The second suggestion is to use asynchronous read 
> operations to local disk of the datanode. I was wondering if there is a logic 
> behind synchronous reads from the file system?
>  
> Code: 
> $HADOOP_SRC/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
>  line 586
> {code:java}
>   /**
>    * Sends a packet with up to maxChunks chunks of data.
>    * 
>    * @param pkt buffer used for writing packet data
>    * @param maxChunks maximum number of chunks to send
>    * @param out stream to send data to
>    * @param transferTo use transferTo to send data
>    * @param throttler used for throttling data transfer bandwidth
>    */
>   private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
>       boolean transferTo, DataTransferThrottler throttler) throws IOException 
> {
>     int dataLen = (int) Math.min(endOffset - offset,
>                              (chunkSize * (long) maxChunks));
>     
>     int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in 
> the packet
>     int checksumDataLen = numChunks * checksumSize;
>     int packetLen = dataLen + checksumDataLen + 4;
>     boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;    
> // The packet buffer is organized as follows:
>     // _______HHHHCCCCD?D?D?D?
>     //        ^   ^
>     //        |   \ checksumOff
>     //        \ headerOff
>     // _ padding, since the header is variable-length
>     // H = header and length prefixes
>     // C = checksums
>     // D? = data, if transferTo is false.
>     
>     int headerLen = writePacketHeader(pkt, dataLen, packetLen);
>     
>     // Per above, the header doesn't start at the beginning of the
>     // buffer
>     int headerOff = pkt.position() - headerLen;
>     
>     int checksumOff = pkt.position();
>     byte[] buf = pkt.array();
>     
>     if (checksumSize > 0 && checksumIn != null) {
>       readChecksum(buf, checksumOff, checksumDataLen);      // write in 
> progress that we need to use to get last checksum
>       if (lastDataPacket && lastChunkChecksum != null) {
>         int start = checksumOff + checksumDataLen - checksumSize;
>         byte[] updatedChecksum = lastChunkChecksum.getChecksum();
>         
>         if (updatedChecksum != null) {
>           System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);
>         }
>       }
>     }
>     
>     int dataOff = checksumOff + checksumDataLen;
>     if (!transferTo) { // normal transfer
>       IOUtils.readFully(blockIn, buf, dataOff, dataLen);      
>       if (verifyChecksum) {
>         verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
>       }
>     }
>     
>     try {
>       if (transferTo) {
>         SocketOutputStream sockOut = (SocketOutputStream)out;
>         // First write header and checksums
>         sockOut.write(buf, headerOff, dataOff - headerOff);
>         
>         // no need to flush since we know out is not a buffered stream
>         FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
>         LongWritable waitTime = new LongWritable();
>         LongWritable transferTime = new LongWritable();
>         sockOut.transferToFully(fileCh, blockInPosition, dataLen, 
>             waitTime, transferTime);
>         
> datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
>         datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
>         blockInPosition += dataLen;
>       } else {
>         // normal transfer
>         out.write(buf, headerOff, dataOff + dataLen - headerOff);
>       }
>     } catch (IOException e) {
>       if (e instanceof SocketTimeoutException) {
>         /*
>          * writing to client timed out.  This happens if the client reads
>          * part of a block and then decides not to read the rest (but leaves
>          * the socket open).
>          * 
>          * Reporting of this case is done in DataXceiver#run
>          */
>       } else {
>         /* Exception while writing to the client. Connection closure from
>          * the other end is mostly the case and we do not care much about
>          * it. But other things can go wrong, especially in transferTo(),
>          * which we do not want to ignore.
>          *
>          * The message parsing below should not be considered as a good
>          * coding example. NEVER do it to drive a program logic. NEVER.
>          * It was done here because the NIO throws an IOException for EPIPE.
>          */
>         String ioem = e.getMessage();
>         if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection 
> reset")) {
>           LOG.error("BlockSender.sendChunks() exception: ", e);
>         }
>         datanode.getBlockScanner().markSuspectBlock(
>               volumeRef.getVolume().getStorageID(),
>               block);
>       }
>       throw ioeToSocketException(e);
>     }    if (throttler != null) { // rebalancing so throttle
>       throttler.throttle(packetLen);
>     }
>     
>     /* Retro throttle */
>     { throttlingpoint.throttle(); }    return dataLen;
>   }
>   
>   /**
>    * Read checksum into given buffer
>    * @param buf buffer to read the checksum into
>    * @param checksumOffset offset at which to write the checksum into buf
>    * @param checksumLen length of checksum to write
>    * @throws IOException on error
>    */
>   private void readChecksum(byte[] buf, final int checksumOffset,
>       final int checksumLen) throws IOException {
>     if (checksumSize <= 0 && checksumIn == null) {
>       return;
>     }
>     try {
>       checksumIn.readFully(buf, checksumOffset, checksumLen);
>     } catch (IOException e) {
>       LOG.warn(" Could not read or failed to veirfy checksum for data"
>           + " at offset " + offset + " for block " + block, e);
>       IOUtils.closeStream(checksumIn);
>       checksumIn = null;
>       if (corruptChecksumOk) {
>         if (checksumOffset < checksumLen) {
>           // Just fill the array with zeros.
>           Arrays.fill(buf, checksumOffset, checksumLen, (byte) 0);
>         }
>       } else {
>         throw e;
>       }
>     }
>   }
> {code}
>  
> XTrace: [http://brownsys.github.io/tracing-framework/xtrace/server/]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to