[
https://issues.apache.org/jira/browse/HDFS-15925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Haoze Wu updated HDFS-15925:
----------------------------
Description:
When the datanode is receiving data block packets from a HDFS client and
forwarding these packets to a mirror (another datanode) simultaneously, a
single IOException in the datanode’s forwarding path can cause the client to
get stuck for 1 min, without any logging. After 1 min, the client’s log shows a
warning of EOFException and `Slow waitForAckedSeqno took 60106ms
(threshold=30000ms)`.
Normally the datanode will inform the client of this error state
immediately, and then the client will resend the packets immediately. The whole
process is very fast. After careful analyses, we find the above symptom is due
to the lack of packet-level mirrorError state synchronization in
BlockReceiver$PacketResponder: in some concurrency condition, the
BlockReceiver$PacketResponder will hang for 1 min and then exit, without
sending the error state to the client.
*Root Cause Analysis*
{code:java}
//hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
class BlockReceiver implements Closeable {
// ...
private void handleMirrorOutError(IOException ioe) throws IOException {
// ...
if (Thread.interrupted()) {
throw ioe;
} else { // encounter an error while writing to mirror
// continue to run even if can not write to mirror
// notify client of the error
// and wait for the client to shut down the pipeline
mirrorError = true; // line 461
}
}
private int receivePacket() throws IOException {
// read the next packet
packetReceiver.receiveNextPacket(in); // line 528
// ...
boolean lastPacketInBlock = header.isLastPacketInBlock(); // line 551
//First write the packet to the mirror:
if (mirrorOut != null && !mirrorError) {
try {
// ...
packetReceiver.mirrorPacketTo(mirrorOut); // line 588
// ...
} catch (IOException e) {
handleMirrorOutError(e); // line 604
}
}
// ...
return lastPacketInBlock?-1:len; // line 849
}
void receiveBlock(...) throws IOException {
// ...
try {
if (isClient && !isTransfer) {
responder = new Daemon(datanode.threadGroup,
new PacketResponder(replyOut, mirrIn, downstreams));
responder.start(); // line 968
}
while(receivePacket() >= 0){/*Receive until the last packet*/} // line 971
// wait for all outstanding packet responses. And then
// indicate responder to gracefully shutdown.
// Mark that responder has been closed for future processing
if (responder != null) {
((PacketResponder)responder.getRunnable()).close(); // line 977
responderClosed = true;
}
// ...
} catch (IOException ioe) { // line
1003
// ...
} finally {
// ...
if (!responderClosed) { // Data transfer was not complete.
if (responder != null) {
// ...
responder.interrupt(); // line
1046
}
// ...
}
if (responder != null) {
try {
responder.interrupt(); // line
1053
// ...
} catch (InterruptedException e) {
responder.interrupt(); // line
1067
// ...
}
// ...
}
}
}
}
{code}
In the `BlockReceiver.receivePacket` method, if the datanode fails to
forward the packet to the mirror ( (line 588) due to an IOException, it is
handled by line 604, which sets the mirrorError flag in line 461. According to
the comments, the BlockReceiver keeps going with the mirrorError state, and the
client would be notified of the error.
However, jstack shows that the datanode gets stuck in the `DataXceiver`
thread (receiving data block packets from client) and the
`BlockReceiver$PacketResponder` thread (replying ACK packets to client). In
particular, the `DataXceiver` thread gets stuck in the loop in line 971, which
is further caused by blocking in line 528, meaning that the `lastPacketInBlock`
packet has not arrived, and no more packets are coming in.
{code:java}
//hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
class BlockReceiver implements Closeable {
// ...
class PacketResponder implements Runnable, Closeable {
// ...
public void run() {
// ...
while (isRunning() && !lastPacketInBlock) {
// ...
try {
// ...
PipelineAck ack = new PipelineAck();
// ...
try {
if (... && !mirrorError) { // line
1381
// ...
// read an ack from downstream datanode
ack.readFields(downstreamIn); // line
1384
// ...
}
// ...
} catch (InterruptedException ine) {
isInterrupted = true; // line
1434
} catch (IOException ioe) {
if (Thread.interrupted()) {
isInterrupted = true; // line
1437
} else ...
}
if (Thread.interrupted() || isInterrupted) { // line
1458
// ...
LOG.info(myString + ": Thread is interrupted.");
running = false;
continue; // line
1472
}
// ...
sendAckUpstream(ack, expected, totalAckTimeNanos, // line
1481
(pkt != null ? pkt.offsetInBlock : 0),
PipelineAck.combineHeader(datanode.getECN(), myStatus));
// ...
} catch (IOException e) {
// ...
} catch (Throwable e) {
// ...
}
}
LOG.info(myString + " terminating");
}
private void sendAckUpstream(...) throws IOException {
try {
// ...
try {
if (!running) return;
sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos, // line
1568
offsetInBlock, myHeader);
} finally {
// ...
}
} catch (InterruptedException ie) {
// ...
}
}
private void sendAckUpstreamUnprotected(...) throws IOException {
final int[] replies;
if (ack == null) {
// ...
replies = new int[] { myHeader };
} else if (mirrorError) { // ack read error
int h = PipelineAck.combineHeader(datanode.getECN(), Status.SUCCESS);
int h1 = PipelineAck.combineHeader(datanode.getECN(), Status.ERROR);
replies = new int[] {h, h1}; // line
1602
} else {
// ...
}
PipelineAck replyAck = new PipelineAck(seqno, replies,
totalAckTimeNanos);
// ...
replyAck.write(upstreamOut); // line
1632
// ...
}
}
}
{code}
The `BlockReceiver$PacketResponder` thread checks the mirrorError flag in
line 1381. The `DataXceiver` thread is run concurrently. If
`BlockReceiver$PacketResponder` finds mirrorError is false, it will try to read
the ACK packet from downstream (the mirror, another datanode) in line 1384,
which is a blocking call.
However, there is a race condition. If the mirrorError flag set by the
`handleMirrorOutError` method is noticed in line 1381, then the
`BlockReceiver$PacketResponder` thread will not run the blocking network I/O
call in line 1384. Instead, it will go to line 1481, and then 1568, and then
1632. According to the code around line 1602, this ACK contains `Status.ERROR`
which can warn the client. On the contrary, if the mirrorError flag is set
after the timing of line 1381, the `BlockReceiver$PacketResponder` thread gets
blocked in line 1384. In our scenario, a data block packet is not sent to the
mirror datanode due to the IOException, so the corresponding ACK packet will
not be sent by the mirror datanode either. Therefore, the
`BlockReceiver$PacketResponder` thread will be blocked here for a long time.
*Fix*
The key is to avoid the problematic concurrency between
`BlockReceiver#receivePacket` and the ACK packet (from downstream mirror
datanode) reading in `BlockReceiver$PacketResponder`. The simplest way to do it
is that, every time `BlockReceiver#receivePacket` successfully forwards a
packet to the downstream mirror datanode, we grant one chance for
`BlockReceiver$PacketResponder` to check the mirrorError state and read the ACK
with the blocking I/O call. It is reasonable because if the datanode has not
sent the packet, it is impossible for the `BlockReceiver$PacketResponder` to
get the corresponding ACK.
The implementation only needs a semaphore in
`BlockReceiver$PacketResponder`, and will not affect the other components.
*Reproduction*
Start HDFS with the default configuration. Then execute a client (we used
the command `bin/hdfs dfs -copyFromLocal ./foo.txt /1.txt` in the terminal).
For each data block packet the client sends to the datanode, the datanode
forwards it by line 588 in `BlockReceiver.java`
([https://github.com/apache/hadoop/blob/rel/release-3.2.2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java#L588]).
Inject one single IOException there.
Most of the time, we don't have the concurrency condition to trigger this
bug. Now the reliable way we use to reproduce this bug is setting
`dfs.blocksize` as `1m` in `hdfs-site.xml`. Then run `bin/hdfs dfs
-copyFromLocal ./foo.txt /1.txt` where `./foo.txt` is a file of 15MB (generated
from `fallocate -l 15000000 foo.txt`). Then do the aforementioned injection in
the timing of the 12th occurrence of
[https://github.com/apache/hadoop/blob/rel/release-3.2.2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java#L747].
was:
When the datanode is receiving data block packets from a HDFS client and
forwarding these packets to a mirror (another datanode) simultaneously, a
single IOException in the datanode’s forwarding path can cause the client to
get stuck for 1 min, without any logging. After 1 min, the client’s log shows a
warning of EOFException and `Slow waitForAckedSeqno took 60106ms
(threshold=30000ms)`.
Normally the datanode will inform the client of this error state
immediately, and then the client will resend the packets immediately. The whole
process is very fast. After careful analyses, we find the above symptom is due
to the lack of packet-level mirrorError state synchronization in
BlockReceiver$PacketResponder: in some concurrency condition, the
BlockReceiver$PacketResponder will hang for 1 min and then exit, without
sending the error state to the client.
*Root Cause Analysis*
{code:java}
//hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
class BlockReceiver implements Closeable {
// ...
private void handleMirrorOutError(IOException ioe) throws IOException {
// ...
if (Thread.interrupted()) {
throw ioe;
} else { // encounter an error while writing to mirror
// continue to run even if can not write to mirror
// notify client of the error
// and wait for the client to shut down the pipeline
mirrorError = true; // line 461
}
}
private int receivePacket() throws IOException {
// read the next packet
packetReceiver.receiveNextPacket(in); // line 528
// ...
boolean lastPacketInBlock = header.isLastPacketInBlock(); // line 551
//First write the packet to the mirror:
if (mirrorOut != null && !mirrorError) {
try {
// ...
packetReceiver.mirrorPacketTo(mirrorOut); // line 588
// ...
} catch (IOException e) {
handleMirrorOutError(e); // line 604
}
}
// ...
return lastPacketInBlock?-1:len; // line 849
}
void receiveBlock(...) throws IOException {
// ...
try {
if (isClient && !isTransfer) {
responder = new Daemon(datanode.threadGroup,
new PacketResponder(replyOut, mirrIn, downstreams));
responder.start(); // line 968
}
while(receivePacket() >= 0){/*Receive until the last packet*/} // line 971
// wait for all outstanding packet responses. And then
// indicate responder to gracefully shutdown.
// Mark that responder has been closed for future processing
if (responder != null) {
((PacketResponder)responder.getRunnable()).close(); // line 977
responderClosed = true;
}
// ...
} catch (IOException ioe) { // line
1003
// ...
} finally {
// ...
if (!responderClosed) { // Data transfer was not complete.
if (responder != null) {
// ...
responder.interrupt(); // line
1046
}
// ...
}
if (responder != null) {
try {
responder.interrupt(); // line
1053
// ...
} catch (InterruptedException e) {
responder.interrupt(); // line
1067
// ...
}
// ...
}
}
}
}
{code}
In the `BlockReceiver.receivePacket` method, if the datanode fails to
forward the packet to the mirror ( (line 588) due to an IOException, it is
handled by line 604, which sets the mirrorError flag in line 461. According to
the comments, the BlockReceiver keeps going with the mirrorError state, and the
client would be notified of the error.
However, jstack shows that the datanode gets stuck in the `DataXceiver`
thread (receiving data block packets from client) and the
`BlockReceiver$PacketResponder` thread (replying ACK packets to client). In
particular, the `DataXceiver` thread gets stuck in the loop in line 971, which
is further caused by blocking in line 528, meaning that the `lastPacketInBlock`
packet has not arrived, and no more packets are coming in.
{code:java}
//hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
class BlockReceiver implements Closeable {
// ...
class PacketResponder implements Runnable, Closeable {
// ...
public void run() {
// ...
while (isRunning() && !lastPacketInBlock) {
// ...
try {
// ...
PipelineAck ack = new PipelineAck();
// ...
try {
if (... && !mirrorError) { // line
1381
// ...
// read an ack from downstream datanode
ack.readFields(downstreamIn); // line
1384
// ...
}
// ...
} catch (InterruptedException ine) {
isInterrupted = true; // line
1434
} catch (IOException ioe) {
if (Thread.interrupted()) {
isInterrupted = true; // line
1437
} else ...
}
if (Thread.interrupted() || isInterrupted) { // line
1458
// ...
LOG.info(myString + ": Thread is interrupted.");
running = false;
continue; // line
1472
}
// ...
sendAckUpstream(ack, expected, totalAckTimeNanos, // line
1481
(pkt != null ? pkt.offsetInBlock : 0),
PipelineAck.combineHeader(datanode.getECN(), myStatus));
// ...
} catch (IOException e) {
// ...
} catch (Throwable e) {
// ...
}
}
LOG.info(myString + " terminating");
}
private void sendAckUpstream(...) throws IOException {
try {
// ...
try {
if (!running) return;
sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos, // line
1568
offsetInBlock, myHeader);
} finally {
// ...
}
} catch (InterruptedException ie) {
// ...
}
}
private void sendAckUpstreamUnprotected(...) throws IOException {
final int[] replies;
if (ack == null) {
// ...
replies = new int[] { myHeader };
} else if (mirrorError) { // ack read error
int h = PipelineAck.combineHeader(datanode.getECN(), Status.SUCCESS);
int h1 = PipelineAck.combineHeader(datanode.getECN(), Status.ERROR);
replies = new int[] {h, h1}; // line
1602
} else {
// ...
}
PipelineAck replyAck = new PipelineAck(seqno, replies,
totalAckTimeNanos);
// ...
replyAck.write(upstreamOut); // line
1632
// ...
}
}
}
{code}
The `BlockReceiver$PacketResponder` thread checks the mirrorError flag in
line 1381. The `DataXceiver` thread is run concurrently. If
`BlockReceiver$PacketResponder` finds mirrorError is false, it will try to read
the ACK packet from downstream (the mirror, another datanode) in line 1384,
which is a blocking call.
However, there is a race condition. If the mirrorError flag set by the
`handleMirrorOutError` method is noticed in line 1381, then the
`BlockReceiver$PacketResponder` thread will not run the blocking network I/O
call in line 1384. Instead, it will go to line 1481, and then 1568, and then
1632. According to the code around line 1602, this ACK contains `Status.ERROR`
which can warn the client. On the contrary, if the mirrorError flag is set
after the timing of line 1381, the `BlockReceiver$PacketResponder` thread gets
blocked in line 1384. In our scenario, a data block packet is not sent to the
mirror datanode due to the IOException, so the corresponding ACK packet will
not be sent by the mirror datanode either. Therefore, the
`BlockReceiver$PacketResponder` thread will be blocked here for a long time.
*Fix*
The key is to avoid the problematic concurrency between
`BlockReceiver#receivePacket` and the ACK packet (from downstream mirror
datanode) reading in `BlockReceiver$PacketResponder`. The simplest way to do it
is that, every time `BlockReceiver#receivePacket` successfully forwards a
packet to the downstream mirror datanode, we grant one chance for
`BlockReceiver$PacketResponder` to check the mirrorError state and read the ACK
with the blocking I/O call. It is reasonable because if the datanode has not
sent the packet, it is impossible for the `BlockReceiver$PacketResponder` to
get the corresponding ACK.
The implementation only needs a semaphore in
`BlockReceiver$PacketResponder`, and will not affect the other components.
*Reproduction*
Start HDFS with the default configuration. Then execute a client (we used
the command `bin/hdfs dfs -copyFromLocal ./foo.txt /1.txt` in the terminal).
For each data block packet the client sends to the datanode, the datanode
forwards it by line 588 in `BlockReceiver.java`
([https://github.com/apache/hadoop/blob/rel/release-3.2.2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java#L588]).
Inject one single IOException there.
Most of the time, we don't have the concurrency condition to trigger this
bug. Now the reliable way we use to reproduce this bug is setting
`dfs.blocksize` as `1m` in `hdfs-site.xml`. Then run `bin/hdfs dfs
-copyFromLocal ./foo.txt /1.txt` where `./foo.txt` is a file of 15MB (generated
from `fallocate -l 15000000 foo.txt`). Then do the aforementioned injection in
the timing of the 12th occurrence of
[https://github.com/apache/hadoop/blob/rel/release-3.2.2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java#L747].
> The lack of packet-level mirrorError state synchronization in
> BlockReceiver$PacketResponder can cause the HDFS client to hang
> -----------------------------------------------------------------------------------------------------------------------------
>
> Key: HDFS-15925
> URL: https://issues.apache.org/jira/browse/HDFS-15925
> Project: Hadoop HDFS
> Issue Type: Bug
> Components: datanode
> Affects Versions: 3.2.2
> Reporter: Haoze Wu
> Priority: Critical
>
> When the datanode is receiving data block packets from a HDFS client and
> forwarding these packets to a mirror (another datanode) simultaneously, a
> single IOException in the datanode’s forwarding path can cause the client to
> get stuck for 1 min, without any logging. After 1 min, the client’s log shows
> a warning of EOFException and `Slow waitForAckedSeqno took 60106ms
> (threshold=30000ms)`.
> Normally the datanode will inform the client of this error state
> immediately, and then the client will resend the packets immediately. The
> whole process is very fast. After careful analyses, we find the above symptom
> is due to the lack of packet-level mirrorError state synchronization in
> BlockReceiver$PacketResponder: in some concurrency condition, the
> BlockReceiver$PacketResponder will hang for 1 min and then exit, without
> sending the error state to the client.
> *Root Cause Analysis*
> {code:java}
> //hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
> class BlockReceiver implements Closeable {
> // ...
> private void handleMirrorOutError(IOException ioe) throws IOException {
> // ...
> if (Thread.interrupted()) {
> throw ioe;
> } else { // encounter an error while writing to mirror
> // continue to run even if can not write to mirror
> // notify client of the error
> // and wait for the client to shut down the pipeline
> mirrorError = true; // line
> 461
> }
> }
> private int receivePacket() throws IOException {
> // read the next packet
> packetReceiver.receiveNextPacket(in); // line
> 528
> // ...
> boolean lastPacketInBlock = header.isLastPacketInBlock(); // line
> 551
> //First write the packet to the mirror:
> if (mirrorOut != null && !mirrorError) {
> try {
> // ...
> packetReceiver.mirrorPacketTo(mirrorOut); // line
> 588
> // ...
> } catch (IOException e) {
> handleMirrorOutError(e); // line
> 604
> }
> }
> // ...
> return lastPacketInBlock?-1:len; // line
> 849
> }
> void receiveBlock(...) throws IOException {
> // ...
> try {
> if (isClient && !isTransfer) {
> responder = new Daemon(datanode.threadGroup,
> new PacketResponder(replyOut, mirrIn, downstreams));
> responder.start(); // line
> 968
> }
> while(receivePacket() >= 0){/*Receive until the last packet*/} // line
> 971
> // wait for all outstanding packet responses. And then
> // indicate responder to gracefully shutdown.
> // Mark that responder has been closed for future processing
> if (responder != null) {
> ((PacketResponder)responder.getRunnable()).close(); // line
> 977
> responderClosed = true;
> }
> // ...
> } catch (IOException ioe) { // line
> 1003
> // ...
> } finally {
> // ...
> if (!responderClosed) { // Data transfer was not complete.
> if (responder != null) {
> // ...
> responder.interrupt(); // line
> 1046
> }
> // ...
> }
> if (responder != null) {
> try {
> responder.interrupt(); // line
> 1053
> // ...
> } catch (InterruptedException e) {
> responder.interrupt(); // line
> 1067
> // ...
> }
> // ...
> }
> }
> }
> }
> {code}
> In the `BlockReceiver.receivePacket` method, if the datanode fails to
> forward the packet to the mirror ( (line 588) due to an IOException, it is
> handled by line 604, which sets the mirrorError flag in line 461. According
> to the comments, the BlockReceiver keeps going with the mirrorError state,
> and the client would be notified of the error.
> However, jstack shows that the datanode gets stuck in the `DataXceiver`
> thread (receiving data block packets from client) and the
> `BlockReceiver$PacketResponder` thread (replying ACK packets to client). In
> particular, the `DataXceiver` thread gets stuck in the loop in line 971,
> which is further caused by blocking in line 528, meaning that the
> `lastPacketInBlock` packet has not arrived, and no more packets are coming in.
> {code:java}
> //hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
> class BlockReceiver implements Closeable {
> // ...
> class PacketResponder implements Runnable, Closeable {
> // ...
> public void run() {
> // ...
> while (isRunning() && !lastPacketInBlock) {
> // ...
> try {
> // ...
> PipelineAck ack = new PipelineAck();
> // ...
> try {
> if (... && !mirrorError) { // line
> 1381
> // ...
> // read an ack from downstream datanode
> ack.readFields(downstreamIn); // line
> 1384
> // ...
> }
> // ...
> } catch (InterruptedException ine) {
> isInterrupted = true; // line
> 1434
> } catch (IOException ioe) {
> if (Thread.interrupted()) {
> isInterrupted = true; // line
> 1437
> } else ...
> }
> if (Thread.interrupted() || isInterrupted) { // line
> 1458
> // ...
> LOG.info(myString + ": Thread is interrupted.");
> running = false;
> continue; // line
> 1472
> }
> // ...
> sendAckUpstream(ack, expected, totalAckTimeNanos, // line
> 1481
> (pkt != null ? pkt.offsetInBlock : 0),
> PipelineAck.combineHeader(datanode.getECN(), myStatus));
> // ...
> } catch (IOException e) {
> // ...
> } catch (Throwable e) {
> // ...
> }
> }
> LOG.info(myString + " terminating");
> }
> private void sendAckUpstream(...) throws IOException {
> try {
> // ...
> try {
> if (!running) return;
> sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos, // line
> 1568
> offsetInBlock, myHeader);
> } finally {
> // ...
> }
> } catch (InterruptedException ie) {
> // ...
> }
> }
> private void sendAckUpstreamUnprotected(...) throws IOException {
> final int[] replies;
> if (ack == null) {
> // ...
> replies = new int[] { myHeader };
> } else if (mirrorError) { // ack read error
> int h = PipelineAck.combineHeader(datanode.getECN(), Status.SUCCESS);
> int h1 = PipelineAck.combineHeader(datanode.getECN(), Status.ERROR);
> replies = new int[] {h, h1}; // line
> 1602
> } else {
> // ...
> }
> PipelineAck replyAck = new PipelineAck(seqno, replies,
> totalAckTimeNanos);
> // ...
> replyAck.write(upstreamOut); // line
> 1632
> // ...
> }
> }
> }
> {code}
> The `BlockReceiver$PacketResponder` thread checks the mirrorError flag in
> line 1381. The `DataXceiver` thread is run concurrently. If
> `BlockReceiver$PacketResponder` finds mirrorError is false, it will try to
> read the ACK packet from downstream (the mirror, another datanode) in line
> 1384, which is a blocking call.
> However, there is a race condition. If the mirrorError flag set by the
> `handleMirrorOutError` method is noticed in line 1381, then the
> `BlockReceiver$PacketResponder` thread will not run the blocking network I/O
> call in line 1384. Instead, it will go to line 1481, and then 1568, and then
> 1632. According to the code around line 1602, this ACK contains
> `Status.ERROR` which can warn the client. On the contrary, if the mirrorError
> flag is set after the timing of line 1381, the
> `BlockReceiver$PacketResponder` thread gets blocked in line 1384. In our
> scenario, a data block packet is not sent to the mirror datanode due to the
> IOException, so the corresponding ACK packet will not be sent by the mirror
> datanode either. Therefore, the `BlockReceiver$PacketResponder` thread will
> be blocked here for a long time.
> *Fix*
> The key is to avoid the problematic concurrency between
> `BlockReceiver#receivePacket` and the ACK packet (from downstream mirror
> datanode) reading in `BlockReceiver$PacketResponder`. The simplest way to do
> it is that, every time `BlockReceiver#receivePacket` successfully forwards a
> packet to the downstream mirror datanode, we grant one chance for
> `BlockReceiver$PacketResponder` to check the mirrorError state and read the
> ACK with the blocking I/O call. It is reasonable because if the datanode has
> not sent the packet, it is impossible for the `BlockReceiver$PacketResponder`
> to get the corresponding ACK.
> The implementation only needs a semaphore in
> `BlockReceiver$PacketResponder`, and will not affect the other components.
> *Reproduction*
> Start HDFS with the default configuration. Then execute a client (we used
> the command `bin/hdfs dfs -copyFromLocal ./foo.txt /1.txt` in the terminal).
> For each data block packet the client sends to the datanode, the datanode
> forwards it by line 588 in `BlockReceiver.java`
> ([https://github.com/apache/hadoop/blob/rel/release-3.2.2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java#L588]).
> Inject one single IOException there.
> Most of the time, we don't have the concurrency condition to trigger this
> bug. Now the reliable way we use to reproduce this bug is setting
> `dfs.blocksize` as `1m` in `hdfs-site.xml`. Then run `bin/hdfs dfs
> -copyFromLocal ./foo.txt /1.txt` where `./foo.txt` is a file of 15MB
> (generated from `fallocate -l 15000000 foo.txt`). Then do the aforementioned
> injection in the timing of the 12th occurrence of
> [https://github.com/apache/hadoop/blob/rel/release-3.2.2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java#L747].
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]