[ 
https://issues.apache.org/jira/browse/HDFS-15206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mania Abdi updated HDFS-15206:
------------------------------
    Description: 
Hello everyone,

I ran a simple benchmark with runs ``` hadoop fs -get /file1.txt ```, and 
file1.txt has 1MB size and I capture the workflow of requests using XTrace. By 
evaluating the workflow trace, I noticed that datanode issues 64KB synchronous 
read request to local file system to read the data, and sends the data back and 
waits for completion. I had a code walk over HDFS code to verify the pattern 
and it was correct. I want to have two suggestions, (1) since each file in HDFS 
block size is usually 128MB, We could use the mmap mapping via FileChannel 
class to load the file into memory and enable file system prefetching and look 
ahead in background, instead of synchronously reading from disk. The second 
suggestion is to use asynchronous read operations to local disk of the 
datanode. I was wondering if there is a logic behind synchronous reads from the 
file system?

 

Code: 
$HADOOP_SRC/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
 line 586
{code:java}
  /**
   * Sends a packet with up to maxChunks chunks of data.
   * 
   * @param pkt buffer used for writing packet data
   * @param maxChunks maximum number of chunks to send
   * @param out stream to send data to
   * @param transferTo use transferTo to send data
   * @param throttler used for throttling data transfer bandwidth
   */
  private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
      boolean transferTo, DataTransferThrottler throttler) throws IOException {
    int dataLen = (int) Math.min(endOffset - offset,
                             (chunkSize * (long) maxChunks));
    
    int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in the 
packet
    int checksumDataLen = numChunks * checksumSize;
    int packetLen = dataLen + checksumDataLen + 4;
    boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;    
// The packet buffer is organized as follows:
    // _______HHHHCCCCD?D?D?D?
    //        ^   ^
    //        |   \ checksumOff
    //        \ headerOff
    // _ padding, since the header is variable-length
    // H = header and length prefixes
    // C = checksums
    // D? = data, if transferTo is false.
    
    int headerLen = writePacketHeader(pkt, dataLen, packetLen);
    
    // Per above, the header doesn't start at the beginning of the
    // buffer
    int headerOff = pkt.position() - headerLen;
    
    int checksumOff = pkt.position();
    byte[] buf = pkt.array();
    
    if (checksumSize > 0 && checksumIn != null) {
      readChecksum(buf, checksumOff, checksumDataLen);      // write in 
progress that we need to use to get last checksum
      if (lastDataPacket && lastChunkChecksum != null) {
        int start = checksumOff + checksumDataLen - checksumSize;
        byte[] updatedChecksum = lastChunkChecksum.getChecksum();
        
        if (updatedChecksum != null) {
          System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);
        }
      }
    }
    
    int dataOff = checksumOff + checksumDataLen;
    if (!transferTo) { // normal transfer
      IOUtils.readFully(blockIn, buf, dataOff, dataLen);      
      if (verifyChecksum) {
        verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
      }
    }
    
    try {
      if (transferTo) {
        SocketOutputStream sockOut = (SocketOutputStream)out;
        // First write header and checksums
        sockOut.write(buf, headerOff, dataOff - headerOff);
        
        // no need to flush since we know out is not a buffered stream
        FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
        LongWritable waitTime = new LongWritable();
        LongWritable transferTime = new LongWritable();
        sockOut.transferToFully(fileCh, blockInPosition, dataLen, 
            waitTime, transferTime);
        datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
        datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
        blockInPosition += dataLen;
      } else {
        // normal transfer
        out.write(buf, headerOff, dataOff + dataLen - headerOff);
      }
    } catch (IOException e) {
      if (e instanceof SocketTimeoutException) {
        /*
         * writing to client timed out.  This happens if the client reads
         * part of a block and then decides not to read the rest (but leaves
         * the socket open).
         * 
         * Reporting of this case is done in DataXceiver#run
         */
      } else {
        /* Exception while writing to the client. Connection closure from
         * the other end is mostly the case and we do not care much about
         * it. But other things can go wrong, especially in transferTo(),
         * which we do not want to ignore.
         *
         * The message parsing below should not be considered as a good
         * coding example. NEVER do it to drive a program logic. NEVER.
         * It was done here because the NIO throws an IOException for EPIPE.
         */
        String ioem = e.getMessage();
        if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection 
reset")) {
          LOG.error("BlockSender.sendChunks() exception: ", e);
        }
        datanode.getBlockScanner().markSuspectBlock(
              volumeRef.getVolume().getStorageID(),
              block);
      }
      throw ioeToSocketException(e);
    }    if (throttler != null) { // rebalancing so throttle
      throttler.throttle(packetLen);
    }
    
    /* Retro throttle */
    { throttlingpoint.throttle(); }    return dataLen;
  }
  
  /**
   * Read checksum into given buffer
   * @param buf buffer to read the checksum into
   * @param checksumOffset offset at which to write the checksum into buf
   * @param checksumLen length of checksum to write
   * @throws IOException on error
   */
  private void readChecksum(byte[] buf, final int checksumOffset,
      final int checksumLen) throws IOException {
    if (checksumSize <= 0 && checksumIn == null) {
      return;
    }
    try {
      checksumIn.readFully(buf, checksumOffset, checksumLen);
    } catch (IOException e) {
      LOG.warn(" Could not read or failed to veirfy checksum for data"
          + " at offset " + offset + " for block " + block, e);
      IOUtils.closeStream(checksumIn);
      checksumIn = null;
      if (corruptChecksumOk) {
        if (checksumOffset < checksumLen) {
          // Just fill the array with zeros.
          Arrays.fill(buf, checksumOffset, checksumLen, (byte) 0);
        }
      } else {
        throw e;
      }
    }
  }

{code}
 

 

 

 

 

 

 

 

XTrace: [http://brownsys.github.io/tracing-framework/xtrace/server/]

 

  was:
Hello everyone,

 I ran a simple benchmark with runs ``` hadoop fs -get /file1.txt ```, and 
file1.txt has 1MB size and I capture the workflow of requests using XTrace. By 
evaluating the workflow trace, I noticed that datanode issues 64KB synchronous 
read request to local file system to read the data, and sends the data back and 
waits for completion. I had a code walk over HDFS code to verify the pattern 
and it was correct. I want to have two suggestions, (1) since each file in HDFS 
block size is usually 128MB, We could use the mmap mapping via FileChannel 
class to load the file into memory and enable file system prefetching and look 
ahead in background, instead of synchronously reading from disk. The second 
suggestion is to use asynchronous read operations to local disk of the 
datanode. I was wondering if there is a logic behind synchronous reads from the 
file system?

 

Code: 

 

 

 

 

XTrace: [http://brownsys.github.io/tracing-framework/xtrace/server/]

 


> HDFS synchronous reads from local file system
> ---------------------------------------------
>
>                 Key: HDFS-15206
>                 URL: https://issues.apache.org/jira/browse/HDFS-15206
>             Project: Hadoop HDFS
>          Issue Type: Improvement
>         Environment: !Screenshot from 2020-03-03 22-07-26.png!
>            Reporter: Mania Abdi
>            Priority: Major
>         Attachments: Screenshot from 2020-03-03 22-07-26.png
>
>
> Hello everyone,
> I ran a simple benchmark with runs ``` hadoop fs -get /file1.txt ```, and 
> file1.txt has 1MB size and I capture the workflow of requests using XTrace. 
> By evaluating the workflow trace, I noticed that datanode issues 64KB 
> synchronous read request to local file system to read the data, and sends the 
> data back and waits for completion. I had a code walk over HDFS code to 
> verify the pattern and it was correct. I want to have two suggestions, (1) 
> since each file in HDFS block size is usually 128MB, We could use the mmap 
> mapping via FileChannel class to load the file into memory and enable file 
> system prefetching and look ahead in background, instead of synchronously 
> reading from disk. The second suggestion is to use asynchronous read 
> operations to local disk of the datanode. I was wondering if there is a logic 
> behind synchronous reads from the file system?
>  
> Code: 
> $HADOOP_SRC/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
>  line 586
> {code:java}
>   /**
>    * Sends a packet with up to maxChunks chunks of data.
>    * 
>    * @param pkt buffer used for writing packet data
>    * @param maxChunks maximum number of chunks to send
>    * @param out stream to send data to
>    * @param transferTo use transferTo to send data
>    * @param throttler used for throttling data transfer bandwidth
>    */
>   private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
>       boolean transferTo, DataTransferThrottler throttler) throws IOException 
> {
>     int dataLen = (int) Math.min(endOffset - offset,
>                              (chunkSize * (long) maxChunks));
>     
>     int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in 
> the packet
>     int checksumDataLen = numChunks * checksumSize;
>     int packetLen = dataLen + checksumDataLen + 4;
>     boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;    
> // The packet buffer is organized as follows:
>     // _______HHHHCCCCD?D?D?D?
>     //        ^   ^
>     //        |   \ checksumOff
>     //        \ headerOff
>     // _ padding, since the header is variable-length
>     // H = header and length prefixes
>     // C = checksums
>     // D? = data, if transferTo is false.
>     
>     int headerLen = writePacketHeader(pkt, dataLen, packetLen);
>     
>     // Per above, the header doesn't start at the beginning of the
>     // buffer
>     int headerOff = pkt.position() - headerLen;
>     
>     int checksumOff = pkt.position();
>     byte[] buf = pkt.array();
>     
>     if (checksumSize > 0 && checksumIn != null) {
>       readChecksum(buf, checksumOff, checksumDataLen);      // write in 
> progress that we need to use to get last checksum
>       if (lastDataPacket && lastChunkChecksum != null) {
>         int start = checksumOff + checksumDataLen - checksumSize;
>         byte[] updatedChecksum = lastChunkChecksum.getChecksum();
>         
>         if (updatedChecksum != null) {
>           System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);
>         }
>       }
>     }
>     
>     int dataOff = checksumOff + checksumDataLen;
>     if (!transferTo) { // normal transfer
>       IOUtils.readFully(blockIn, buf, dataOff, dataLen);      
>       if (verifyChecksum) {
>         verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
>       }
>     }
>     
>     try {
>       if (transferTo) {
>         SocketOutputStream sockOut = (SocketOutputStream)out;
>         // First write header and checksums
>         sockOut.write(buf, headerOff, dataOff - headerOff);
>         
>         // no need to flush since we know out is not a buffered stream
>         FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
>         LongWritable waitTime = new LongWritable();
>         LongWritable transferTime = new LongWritable();
>         sockOut.transferToFully(fileCh, blockInPosition, dataLen, 
>             waitTime, transferTime);
>         
> datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
>         datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
>         blockInPosition += dataLen;
>       } else {
>         // normal transfer
>         out.write(buf, headerOff, dataOff + dataLen - headerOff);
>       }
>     } catch (IOException e) {
>       if (e instanceof SocketTimeoutException) {
>         /*
>          * writing to client timed out.  This happens if the client reads
>          * part of a block and then decides not to read the rest (but leaves
>          * the socket open).
>          * 
>          * Reporting of this case is done in DataXceiver#run
>          */
>       } else {
>         /* Exception while writing to the client. Connection closure from
>          * the other end is mostly the case and we do not care much about
>          * it. But other things can go wrong, especially in transferTo(),
>          * which we do not want to ignore.
>          *
>          * The message parsing below should not be considered as a good
>          * coding example. NEVER do it to drive a program logic. NEVER.
>          * It was done here because the NIO throws an IOException for EPIPE.
>          */
>         String ioem = e.getMessage();
>         if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection 
> reset")) {
>           LOG.error("BlockSender.sendChunks() exception: ", e);
>         }
>         datanode.getBlockScanner().markSuspectBlock(
>               volumeRef.getVolume().getStorageID(),
>               block);
>       }
>       throw ioeToSocketException(e);
>     }    if (throttler != null) { // rebalancing so throttle
>       throttler.throttle(packetLen);
>     }
>     
>     /* Retro throttle */
>     { throttlingpoint.throttle(); }    return dataLen;
>   }
>   
>   /**
>    * Read checksum into given buffer
>    * @param buf buffer to read the checksum into
>    * @param checksumOffset offset at which to write the checksum into buf
>    * @param checksumLen length of checksum to write
>    * @throws IOException on error
>    */
>   private void readChecksum(byte[] buf, final int checksumOffset,
>       final int checksumLen) throws IOException {
>     if (checksumSize <= 0 && checksumIn == null) {
>       return;
>     }
>     try {
>       checksumIn.readFully(buf, checksumOffset, checksumLen);
>     } catch (IOException e) {
>       LOG.warn(" Could not read or failed to veirfy checksum for data"
>           + " at offset " + offset + " for block " + block, e);
>       IOUtils.closeStream(checksumIn);
>       checksumIn = null;
>       if (corruptChecksumOk) {
>         if (checksumOffset < checksumLen) {
>           // Just fill the array with zeros.
>           Arrays.fill(buf, checksumOffset, checksumLen, (byte) 0);
>         }
>       } else {
>         throw e;
>       }
>     }
>   }
> {code}
>  
>  
>  
>  
>  
>  
>  
>  
> XTrace: [http://brownsys.github.io/tracing-framework/xtrace/server/]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to