[ https://issues.apache.org/jira/browse/HDFS-15206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mania Abdi updated HDFS-15206: ------------------------------ Priority: Minor (was: Major) > HDFS synchronous reads from local file system > --------------------------------------------- > > Key: HDFS-15206 > URL: https://issues.apache.org/jira/browse/HDFS-15206 > Project: Hadoop HDFS > Issue Type: Improvement > Environment: !Screenshot from 2020-03-03 22-07-26.png! > Reporter: Mania Abdi > Priority: Minor > Attachments: Screenshot from 2020-03-03 22-07-26.png > > > Hello everyone, > I ran a simple benchmark with runs ``` hadoop fs -get /file1.txt ```, and > file1.txt has 1MB size and I capture the workflow of requests using XTrace. > By evaluating the workflow trace, I noticed that datanode issues 64KB > synchronous read request to local file system to read the data, and sends the > data back and waits for completion. I had a code walk over HDFS code to > verify the pattern and it was correct. I want to have two suggestions, (1) > since each file in HDFS block size is usually 128MB, We could use the mmap > mapping via FileChannel class to load the file into memory and enable file > system prefetching and look ahead in background, instead of synchronously > reading from disk. The second suggestion is to use asynchronous read > operations to local disk of the datanode. I was wondering if there is a logic > behind synchronous reads from the file system? > > Code: > $HADOOP_SRC/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java > line 586 > {code:java} > /** > * Sends a packet with up to maxChunks chunks of data. > * > * @param pkt buffer used for writing packet data > * @param maxChunks maximum number of chunks to send > * @param out stream to send data to > * @param transferTo use transferTo to send data > * @param throttler used for throttling data transfer bandwidth > */ > private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out, > boolean transferTo, DataTransferThrottler throttler) throws IOException > { > int dataLen = (int) Math.min(endOffset - offset, > (chunkSize * (long) maxChunks)); > > int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in > the packet > int checksumDataLen = numChunks * checksumSize; > int packetLen = dataLen + checksumDataLen + 4; > boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0; > // The packet buffer is organized as follows: > // _______HHHHCCCCD?D?D?D? > // ^ ^ > // | \ checksumOff > // \ headerOff > // _ padding, since the header is variable-length > // H = header and length prefixes > // C = checksums > // D? = data, if transferTo is false. > > int headerLen = writePacketHeader(pkt, dataLen, packetLen); > > // Per above, the header doesn't start at the beginning of the > // buffer > int headerOff = pkt.position() - headerLen; > > int checksumOff = pkt.position(); > byte[] buf = pkt.array(); > > if (checksumSize > 0 && checksumIn != null) { > readChecksum(buf, checksumOff, checksumDataLen); // write in > progress that we need to use to get last checksum > if (lastDataPacket && lastChunkChecksum != null) { > int start = checksumOff + checksumDataLen - checksumSize; > byte[] updatedChecksum = lastChunkChecksum.getChecksum(); > > if (updatedChecksum != null) { > System.arraycopy(updatedChecksum, 0, buf, start, checksumSize); > } > } > } > > int dataOff = checksumOff + checksumDataLen; > if (!transferTo) { // normal transfer > IOUtils.readFully(blockIn, buf, dataOff, dataLen); > if (verifyChecksum) { > verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff); > } > } > > try { > if (transferTo) { > SocketOutputStream sockOut = (SocketOutputStream)out; > // First write header and checksums > sockOut.write(buf, headerOff, dataOff - headerOff); > > // no need to flush since we know out is not a buffered stream > FileChannel fileCh = ((FileInputStream)blockIn).getChannel(); > LongWritable waitTime = new LongWritable(); > LongWritable transferTime = new LongWritable(); > sockOut.transferToFully(fileCh, blockInPosition, dataLen, > waitTime, transferTime); > > datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get()); > datanode.metrics.addSendDataPacketTransferNanos(transferTime.get()); > blockInPosition += dataLen; > } else { > // normal transfer > out.write(buf, headerOff, dataOff + dataLen - headerOff); > } > } catch (IOException e) { > if (e instanceof SocketTimeoutException) { > /* > * writing to client timed out. This happens if the client reads > * part of a block and then decides not to read the rest (but leaves > * the socket open). > * > * Reporting of this case is done in DataXceiver#run > */ > } else { > /* Exception while writing to the client. Connection closure from > * the other end is mostly the case and we do not care much about > * it. But other things can go wrong, especially in transferTo(), > * which we do not want to ignore. > * > * The message parsing below should not be considered as a good > * coding example. NEVER do it to drive a program logic. NEVER. > * It was done here because the NIO throws an IOException for EPIPE. > */ > String ioem = e.getMessage(); > if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection > reset")) { > LOG.error("BlockSender.sendChunks() exception: ", e); > } > datanode.getBlockScanner().markSuspectBlock( > volumeRef.getVolume().getStorageID(), > block); > } > throw ioeToSocketException(e); > } if (throttler != null) { // rebalancing so throttle > throttler.throttle(packetLen); > } > > /* Retro throttle */ > { throttlingpoint.throttle(); } return dataLen; > } > > /** > * Read checksum into given buffer > * @param buf buffer to read the checksum into > * @param checksumOffset offset at which to write the checksum into buf > * @param checksumLen length of checksum to write > * @throws IOException on error > */ > private void readChecksum(byte[] buf, final int checksumOffset, > final int checksumLen) throws IOException { > if (checksumSize <= 0 && checksumIn == null) { > return; > } > try { > checksumIn.readFully(buf, checksumOffset, checksumLen); > } catch (IOException e) { > LOG.warn(" Could not read or failed to veirfy checksum for data" > + " at offset " + offset + " for block " + block, e); > IOUtils.closeStream(checksumIn); > checksumIn = null; > if (corruptChecksumOk) { > if (checksumOffset < checksumLen) { > // Just fill the array with zeros. > Arrays.fill(buf, checksumOffset, checksumLen, (byte) 0); > } > } else { > throw e; > } > } > } > {code} > > XTrace: [http://brownsys.github.io/tracing-framework/xtrace/server/] > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org