[
https://issues.apache.org/jira/browse/HDFS-15206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mania Abdi updated HDFS-15206:
------------------------------
Description:
Hello everyone,
I ran a simple benchmark with runs ``` hadoop fs -get /file1.txt ```, and
file1.txt has 1MB size and I capture the workflow of requests using XTrace. By
evaluating the workflow trace, I noticed that datanode issues 64KB synchronous
read request to local file system to read the data, and sends the data back and
waits for completion. I had a code walk over HDFS code to verify the pattern
and it was correct. I want to have two suggestions, (1) since each file in HDFS
block size is usually 128MB, We could use the mmap mapping via FileChannel
class to load the file into memory and enable file system prefetching and look
ahead in background, instead of synchronously reading from disk. The second
suggestion is to use asynchronous read operations to local disk of the
datanode. I was wondering if there is a logic behind synchronous reads from the
file system?
Code:
$HADOOP_SRC/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
line 586
{code:java}
/**
* Sends a packet with up to maxChunks chunks of data.
*
* @param pkt buffer used for writing packet data
* @param maxChunks maximum number of chunks to send
* @param out stream to send data to
* @param transferTo use transferTo to send data
* @param throttler used for throttling data transfer bandwidth
*/
private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
boolean transferTo, DataTransferThrottler throttler) throws IOException {
int dataLen = (int) Math.min(endOffset - offset,
(chunkSize * (long) maxChunks));
int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in the
packet
int checksumDataLen = numChunks * checksumSize;
int packetLen = dataLen + checksumDataLen + 4;
boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
// The packet buffer is organized as follows:
// _______HHHHCCCCD?D?D?D?
// ^ ^
// | \ checksumOff
// \ headerOff
// _ padding, since the header is variable-length
// H = header and length prefixes
// C = checksums
// D? = data, if transferTo is false.
int headerLen = writePacketHeader(pkt, dataLen, packetLen);
// Per above, the header doesn't start at the beginning of the
// buffer
int headerOff = pkt.position() - headerLen;
int checksumOff = pkt.position();
byte[] buf = pkt.array();
if (checksumSize > 0 && checksumIn != null) {
readChecksum(buf, checksumOff, checksumDataLen); // write in
progress that we need to use to get last checksum
if (lastDataPacket && lastChunkChecksum != null) {
int start = checksumOff + checksumDataLen - checksumSize;
byte[] updatedChecksum = lastChunkChecksum.getChecksum();
if (updatedChecksum != null) {
System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);
}
}
}
int dataOff = checksumOff + checksumDataLen;
if (!transferTo) { // normal transfer
IOUtils.readFully(blockIn, buf, dataOff, dataLen);
if (verifyChecksum) {
verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
}
}
try {
if (transferTo) {
SocketOutputStream sockOut = (SocketOutputStream)out;
// First write header and checksums
sockOut.write(buf, headerOff, dataOff - headerOff);
// no need to flush since we know out is not a buffered stream
FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
LongWritable waitTime = new LongWritable();
LongWritable transferTime = new LongWritable();
sockOut.transferToFully(fileCh, blockInPosition, dataLen,
waitTime, transferTime);
datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
blockInPosition += dataLen;
} else {
// normal transfer
out.write(buf, headerOff, dataOff + dataLen - headerOff);
}
} catch (IOException e) {
if (e instanceof SocketTimeoutException) {
/*
* writing to client timed out. This happens if the client reads
* part of a block and then decides not to read the rest (but leaves
* the socket open).
*
* Reporting of this case is done in DataXceiver#run
*/
} else {
/* Exception while writing to the client. Connection closure from
* the other end is mostly the case and we do not care much about
* it. But other things can go wrong, especially in transferTo(),
* which we do not want to ignore.
*
* The message parsing below should not be considered as a good
* coding example. NEVER do it to drive a program logic. NEVER.
* It was done here because the NIO throws an IOException for EPIPE.
*/
String ioem = e.getMessage();
if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection
reset")) {
LOG.error("BlockSender.sendChunks() exception: ", e);
}
datanode.getBlockScanner().markSuspectBlock(
volumeRef.getVolume().getStorageID(),
block);
}
throw ioeToSocketException(e);
} if (throttler != null) { // rebalancing so throttle
throttler.throttle(packetLen);
}
/* Retro throttle */
{ throttlingpoint.throttle(); } return dataLen;
}
/**
* Read checksum into given buffer
* @param buf buffer to read the checksum into
* @param checksumOffset offset at which to write the checksum into buf
* @param checksumLen length of checksum to write
* @throws IOException on error
*/
private void readChecksum(byte[] buf, final int checksumOffset,
final int checksumLen) throws IOException {
if (checksumSize <= 0 && checksumIn == null) {
return;
}
try {
checksumIn.readFully(buf, checksumOffset, checksumLen);
} catch (IOException e) {
LOG.warn(" Could not read or failed to veirfy checksum for data"
+ " at offset " + offset + " for block " + block, e);
IOUtils.closeStream(checksumIn);
checksumIn = null;
if (corruptChecksumOk) {
if (checksumOffset < checksumLen) {
// Just fill the array with zeros.
Arrays.fill(buf, checksumOffset, checksumLen, (byte) 0);
}
} else {
throw e;
}
}
}
{code}
XTrace: [http://brownsys.github.io/tracing-framework/xtrace/server/]
was:
Hello everyone,
I ran a simple benchmark with runs ``` hadoop fs -get /file1.txt ```, and
file1.txt has 1MB size and I capture the workflow of requests using XTrace. By
evaluating the workflow trace, I noticed that datanode issues 64KB synchronous
read request to local file system to read the data, and sends the data back and
waits for completion. I had a code walk over HDFS code to verify the pattern
and it was correct. I want to have two suggestions, (1) since each file in HDFS
block size is usually 128MB, We could use the mmap mapping via FileChannel
class to load the file into memory and enable file system prefetching and look
ahead in background, instead of synchronously reading from disk. The second
suggestion is to use asynchronous read operations to local disk of the
datanode. I was wondering if there is a logic behind synchronous reads from the
file system?
Code:
XTrace: [http://brownsys.github.io/tracing-framework/xtrace/server/]
> HDFS synchronous reads from local file system
> ---------------------------------------------
>
> Key: HDFS-15206
> URL: https://issues.apache.org/jira/browse/HDFS-15206
> Project: Hadoop HDFS
> Issue Type: Improvement
> Environment: !Screenshot from 2020-03-03 22-07-26.png!
> Reporter: Mania Abdi
> Priority: Major
> Attachments: Screenshot from 2020-03-03 22-07-26.png
>
>
> Hello everyone,
> I ran a simple benchmark with runs ``` hadoop fs -get /file1.txt ```, and
> file1.txt has 1MB size and I capture the workflow of requests using XTrace.
> By evaluating the workflow trace, I noticed that datanode issues 64KB
> synchronous read request to local file system to read the data, and sends the
> data back and waits for completion. I had a code walk over HDFS code to
> verify the pattern and it was correct. I want to have two suggestions, (1)
> since each file in HDFS block size is usually 128MB, We could use the mmap
> mapping via FileChannel class to load the file into memory and enable file
> system prefetching and look ahead in background, instead of synchronously
> reading from disk. The second suggestion is to use asynchronous read
> operations to local disk of the datanode. I was wondering if there is a logic
> behind synchronous reads from the file system?
>
> Code:
> $HADOOP_SRC/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
> line 586
> {code:java}
> /**
> * Sends a packet with up to maxChunks chunks of data.
> *
> * @param pkt buffer used for writing packet data
> * @param maxChunks maximum number of chunks to send
> * @param out stream to send data to
> * @param transferTo use transferTo to send data
> * @param throttler used for throttling data transfer bandwidth
> */
> private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
> boolean transferTo, DataTransferThrottler throttler) throws IOException
> {
> int dataLen = (int) Math.min(endOffset - offset,
> (chunkSize * (long) maxChunks));
>
> int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in
> the packet
> int checksumDataLen = numChunks * checksumSize;
> int packetLen = dataLen + checksumDataLen + 4;
> boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
> // The packet buffer is organized as follows:
> // _______HHHHCCCCD?D?D?D?
> // ^ ^
> // | \ checksumOff
> // \ headerOff
> // _ padding, since the header is variable-length
> // H = header and length prefixes
> // C = checksums
> // D? = data, if transferTo is false.
>
> int headerLen = writePacketHeader(pkt, dataLen, packetLen);
>
> // Per above, the header doesn't start at the beginning of the
> // buffer
> int headerOff = pkt.position() - headerLen;
>
> int checksumOff = pkt.position();
> byte[] buf = pkt.array();
>
> if (checksumSize > 0 && checksumIn != null) {
> readChecksum(buf, checksumOff, checksumDataLen); // write in
> progress that we need to use to get last checksum
> if (lastDataPacket && lastChunkChecksum != null) {
> int start = checksumOff + checksumDataLen - checksumSize;
> byte[] updatedChecksum = lastChunkChecksum.getChecksum();
>
> if (updatedChecksum != null) {
> System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);
> }
> }
> }
>
> int dataOff = checksumOff + checksumDataLen;
> if (!transferTo) { // normal transfer
> IOUtils.readFully(blockIn, buf, dataOff, dataLen);
> if (verifyChecksum) {
> verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
> }
> }
>
> try {
> if (transferTo) {
> SocketOutputStream sockOut = (SocketOutputStream)out;
> // First write header and checksums
> sockOut.write(buf, headerOff, dataOff - headerOff);
>
> // no need to flush since we know out is not a buffered stream
> FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
> LongWritable waitTime = new LongWritable();
> LongWritable transferTime = new LongWritable();
> sockOut.transferToFully(fileCh, blockInPosition, dataLen,
> waitTime, transferTime);
>
> datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
> datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
> blockInPosition += dataLen;
> } else {
> // normal transfer
> out.write(buf, headerOff, dataOff + dataLen - headerOff);
> }
> } catch (IOException e) {
> if (e instanceof SocketTimeoutException) {
> /*
> * writing to client timed out. This happens if the client reads
> * part of a block and then decides not to read the rest (but leaves
> * the socket open).
> *
> * Reporting of this case is done in DataXceiver#run
> */
> } else {
> /* Exception while writing to the client. Connection closure from
> * the other end is mostly the case and we do not care much about
> * it. But other things can go wrong, especially in transferTo(),
> * which we do not want to ignore.
> *
> * The message parsing below should not be considered as a good
> * coding example. NEVER do it to drive a program logic. NEVER.
> * It was done here because the NIO throws an IOException for EPIPE.
> */
> String ioem = e.getMessage();
> if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection
> reset")) {
> LOG.error("BlockSender.sendChunks() exception: ", e);
> }
> datanode.getBlockScanner().markSuspectBlock(
> volumeRef.getVolume().getStorageID(),
> block);
> }
> throw ioeToSocketException(e);
> } if (throttler != null) { // rebalancing so throttle
> throttler.throttle(packetLen);
> }
>
> /* Retro throttle */
> { throttlingpoint.throttle(); } return dataLen;
> }
>
> /**
> * Read checksum into given buffer
> * @param buf buffer to read the checksum into
> * @param checksumOffset offset at which to write the checksum into buf
> * @param checksumLen length of checksum to write
> * @throws IOException on error
> */
> private void readChecksum(byte[] buf, final int checksumOffset,
> final int checksumLen) throws IOException {
> if (checksumSize <= 0 && checksumIn == null) {
> return;
> }
> try {
> checksumIn.readFully(buf, checksumOffset, checksumLen);
> } catch (IOException e) {
> LOG.warn(" Could not read or failed to veirfy checksum for data"
> + " at offset " + offset + " for block " + block, e);
> IOUtils.closeStream(checksumIn);
> checksumIn = null;
> if (corruptChecksumOk) {
> if (checksumOffset < checksumLen) {
> // Just fill the array with zeros.
> Arrays.fill(buf, checksumOffset, checksumLen, (byte) 0);
> }
> } else {
> throw e;
> }
> }
> }
> {code}
>
>
>
>
>
>
>
>
> XTrace: [http://brownsys.github.io/tracing-framework/xtrace/server/]
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]