[
https://issues.apache.org/jira/browse/HDFS-15206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mania Abdi updated HDFS-15206:
------------------------------
Priority: Minor (was: Major)
> HDFS synchronous reads from local file system
> ---------------------------------------------
>
> Key: HDFS-15206
> URL: https://issues.apache.org/jira/browse/HDFS-15206
> Project: Hadoop HDFS
> Issue Type: Improvement
> Environment: !Screenshot from 2020-03-03 22-07-26.png!
> Reporter: Mania Abdi
> Priority: Minor
> Attachments: Screenshot from 2020-03-03 22-07-26.png
>
>
> Hello everyone,
> I ran a simple benchmark with runs ``` hadoop fs -get /file1.txt ```, and
> file1.txt has 1MB size and I capture the workflow of requests using XTrace.
> By evaluating the workflow trace, I noticed that datanode issues 64KB
> synchronous read request to local file system to read the data, and sends the
> data back and waits for completion. I had a code walk over HDFS code to
> verify the pattern and it was correct. I want to have two suggestions, (1)
> since each file in HDFS block size is usually 128MB, We could use the mmap
> mapping via FileChannel class to load the file into memory and enable file
> system prefetching and look ahead in background, instead of synchronously
> reading from disk. The second suggestion is to use asynchronous read
> operations to local disk of the datanode. I was wondering if there is a logic
> behind synchronous reads from the file system?
>
> Code:
> $HADOOP_SRC/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
> line 586
> {code:java}
> /**
> * Sends a packet with up to maxChunks chunks of data.
> *
> * @param pkt buffer used for writing packet data
> * @param maxChunks maximum number of chunks to send
> * @param out stream to send data to
> * @param transferTo use transferTo to send data
> * @param throttler used for throttling data transfer bandwidth
> */
> private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
> boolean transferTo, DataTransferThrottler throttler) throws IOException
> {
> int dataLen = (int) Math.min(endOffset - offset,
> (chunkSize * (long) maxChunks));
>
> int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in
> the packet
> int checksumDataLen = numChunks * checksumSize;
> int packetLen = dataLen + checksumDataLen + 4;
> boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
> // The packet buffer is organized as follows:
> // _______HHHHCCCCD?D?D?D?
> // ^ ^
> // | \ checksumOff
> // \ headerOff
> // _ padding, since the header is variable-length
> // H = header and length prefixes
> // C = checksums
> // D? = data, if transferTo is false.
>
> int headerLen = writePacketHeader(pkt, dataLen, packetLen);
>
> // Per above, the header doesn't start at the beginning of the
> // buffer
> int headerOff = pkt.position() - headerLen;
>
> int checksumOff = pkt.position();
> byte[] buf = pkt.array();
>
> if (checksumSize > 0 && checksumIn != null) {
> readChecksum(buf, checksumOff, checksumDataLen); // write in
> progress that we need to use to get last checksum
> if (lastDataPacket && lastChunkChecksum != null) {
> int start = checksumOff + checksumDataLen - checksumSize;
> byte[] updatedChecksum = lastChunkChecksum.getChecksum();
>
> if (updatedChecksum != null) {
> System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);
> }
> }
> }
>
> int dataOff = checksumOff + checksumDataLen;
> if (!transferTo) { // normal transfer
> IOUtils.readFully(blockIn, buf, dataOff, dataLen);
> if (verifyChecksum) {
> verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
> }
> }
>
> try {
> if (transferTo) {
> SocketOutputStream sockOut = (SocketOutputStream)out;
> // First write header and checksums
> sockOut.write(buf, headerOff, dataOff - headerOff);
>
> // no need to flush since we know out is not a buffered stream
> FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
> LongWritable waitTime = new LongWritable();
> LongWritable transferTime = new LongWritable();
> sockOut.transferToFully(fileCh, blockInPosition, dataLen,
> waitTime, transferTime);
>
> datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
> datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
> blockInPosition += dataLen;
> } else {
> // normal transfer
> out.write(buf, headerOff, dataOff + dataLen - headerOff);
> }
> } catch (IOException e) {
> if (e instanceof SocketTimeoutException) {
> /*
> * writing to client timed out. This happens if the client reads
> * part of a block and then decides not to read the rest (but leaves
> * the socket open).
> *
> * Reporting of this case is done in DataXceiver#run
> */
> } else {
> /* Exception while writing to the client. Connection closure from
> * the other end is mostly the case and we do not care much about
> * it. But other things can go wrong, especially in transferTo(),
> * which we do not want to ignore.
> *
> * The message parsing below should not be considered as a good
> * coding example. NEVER do it to drive a program logic. NEVER.
> * It was done here because the NIO throws an IOException for EPIPE.
> */
> String ioem = e.getMessage();
> if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection
> reset")) {
> LOG.error("BlockSender.sendChunks() exception: ", e);
> }
> datanode.getBlockScanner().markSuspectBlock(
> volumeRef.getVolume().getStorageID(),
> block);
> }
> throw ioeToSocketException(e);
> } if (throttler != null) { // rebalancing so throttle
> throttler.throttle(packetLen);
> }
>
> /* Retro throttle */
> { throttlingpoint.throttle(); } return dataLen;
> }
>
> /**
> * Read checksum into given buffer
> * @param buf buffer to read the checksum into
> * @param checksumOffset offset at which to write the checksum into buf
> * @param checksumLen length of checksum to write
> * @throws IOException on error
> */
> private void readChecksum(byte[] buf, final int checksumOffset,
> final int checksumLen) throws IOException {
> if (checksumSize <= 0 && checksumIn == null) {
> return;
> }
> try {
> checksumIn.readFully(buf, checksumOffset, checksumLen);
> } catch (IOException e) {
> LOG.warn(" Could not read or failed to veirfy checksum for data"
> + " at offset " + offset + " for block " + block, e);
> IOUtils.closeStream(checksumIn);
> checksumIn = null;
> if (corruptChecksumOk) {
> if (checksumOffset < checksumLen) {
> // Just fill the array with zeros.
> Arrays.fill(buf, checksumOffset, checksumLen, (byte) 0);
> }
> } else {
> throw e;
> }
> }
> }
> {code}
>
> XTrace: [http://brownsys.github.io/tracing-framework/xtrace/server/]
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]