[ 
https://issues.apache.org/jira/browse/HDFS-15925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haoze Wu updated HDFS-15925:
----------------------------
    Description: 
    When the datanode is receiving data block packets from a HDFS client and 
forwarding these packets to a mirror (another datanode) simultaneously, a 
single IOException in the datanode’s forwarding path can cause the client to 
get stuck for 1 min, without any logging. After 1 min, the client’s log shows a 
warning of EOFException and `Slow waitForAckedSeqno took 60106ms 
(threshold=30000ms)`.

    Normally the datanode will inform the client of this error state 
immediately, and then the client will resend the packets immediately. The whole 
process is very fast. After careful analyses, we find the above symptom is due 
to the lack of packet-level mirrorError state synchronization in 
BlockReceiver$PacketResponder: in some concurrency condition, the 
BlockReceiver$PacketResponder will hang for 1 min and then exit, without 
sending the error state to the client.

*Root Cause Analysis* 
{code:java}
//hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

class BlockReceiver implements Closeable {
  // ...

  private void handleMirrorOutError(IOException ioe) throws IOException {
    // ...
    if (Thread.interrupted()) {
      throw ioe;
    } else { // encounter an error while writing to mirror
      // continue to run even if can not write to mirror
      // notify client of the error
      // and wait for the client to shut down the pipeline
      mirrorError = true;                                            // line 461
    }
  }

  private int receivePacket() throws IOException {
    // read the next packet
    packetReceiver.receiveNextPacket(in);                            // line 528
    // ...
    boolean lastPacketInBlock = header.isLastPacketInBlock();        // line 551
    //First write the packet to the mirror:
    if (mirrorOut != null && !mirrorError) {
      try {
        // ...
        packetReceiver.mirrorPacketTo(mirrorOut);                    // line 588
        // ...
      } catch (IOException e) {
        handleMirrorOutError(e);                                     // line 604
      }
    }
    // ...
    return lastPacketInBlock?-1:len;                                 // line 849
  }

  void receiveBlock(...) throws IOException {
    // ...
    try {
      if (isClient && !isTransfer) {
        responder = new Daemon(datanode.threadGroup, 
            new PacketResponder(replyOut, mirrIn, downstreams));
        responder.start();                                           // line 968
      }

      while(receivePacket() >= 0){/*Receive until the last packet*/} // line 971

      // wait for all outstanding packet responses. And then
      // indicate responder to gracefully shutdown.
      // Mark that responder has been closed for future processing
      if (responder != null) {
        ((PacketResponder)responder.getRunnable()).close();          // line 977
        responderClosed = true;
      }
      // ...
    } catch (IOException ioe) {                                      // line 
1003
      // ...
    } finally {
      // ...
      if (!responderClosed) { // Data transfer was not complete.
        if (responder != null) {
          // ...
          responder.interrupt();                                     // line 
1046
        }
        // ...
      }
      if (responder != null) {
        try {
          responder.interrupt();                                     // line 
1053
          // ...
        } catch (InterruptedException e) {
          responder.interrupt();                                     // line 
1067
          // ...
        }
        // ...
      }
    }
  }
}
{code}
    In the `BlockReceiver.receivePacket` method, if the datanode fails to 
forward the packet to the mirror ( (line 588) due to an IOException, it is 
handled by line 604, which sets the mirrorError flag in line 461. According to 
the comments, the BlockReceiver keeps going with the mirrorError state, and the 
client would be notified of the error.

    However, jstack shows that the datanode gets stuck in the `DataXceiver` 
thread (receiving data block packets from client) and the 
`BlockReceiver$PacketResponder` thread (replying ACK packets to client). In 
particular, the `DataXceiver` thread gets stuck in the loop in line 971, which 
is further caused by blocking in line 528, meaning that the `lastPacketInBlock` 
packet has not arrived, and no more packets are coming in.
{code:java}
//hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

class BlockReceiver implements Closeable {
  // ...

  class PacketResponder implements Runnable, Closeable {
    // ...

    public void run() {
      // ...
      while (isRunning() && !lastPacketInBlock) {
        // ...
        try {
          // ...
          PipelineAck ack = new PipelineAck();
          // ...
          try {
            if (... && !mirrorError) {                               // line 
1381
              // ...
              // read an ack from downstream datanode
              ack.readFields(downstreamIn);                          // line 
1384
              // ...
            }
            // ...
          } catch (InterruptedException ine) {
            isInterrupted = true;                                    // line 
1434
          } catch (IOException ioe) {
            if (Thread.interrupted()) {
              isInterrupted = true;                                  // line 
1437
            } else ...
          }

          if (Thread.interrupted() || isInterrupted) {               // line 
1458
            // ...
            LOG.info(myString + ": Thread is interrupted.");
            running = false;
            continue;                                                // line 
1472
          }
          // ...
          sendAckUpstream(ack, expected, totalAckTimeNanos,          // line 
1481
            (pkt != null ? pkt.offsetInBlock : 0),
            PipelineAck.combineHeader(datanode.getECN(), myStatus));
          // ...
        } catch (IOException e) {
          // ...
        } catch (Throwable e) {
          // ...
        }
      }
      LOG.info(myString + " terminating");
    }

    private void sendAckUpstream(...) throws IOException {
      try {
        // ...

        try {
          if (!running) return;
          sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos,  // line 
1568
              offsetInBlock, myHeader);
        } finally {
          // ...
        }
      } catch (InterruptedException ie) {
        // ...
      }
    }

    private void sendAckUpstreamUnprotected(...) throws IOException {
      final int[] replies;
      if (ack == null) {
        // ...
        replies = new int[] { myHeader };
      } else if (mirrorError) { // ack read error
        int h = PipelineAck.combineHeader(datanode.getECN(), Status.SUCCESS);
        int h1 = PipelineAck.combineHeader(datanode.getECN(), Status.ERROR);
        replies = new int[] {h, h1};                                 // line 
1602
      } else {
        // ...
      }
      PipelineAck replyAck = new PipelineAck(seqno, replies,
          totalAckTimeNanos);
      // ...
      replyAck.write(upstreamOut);                                   // line 
1632
      // ...
    }

  }
}
{code}
    The `BlockReceiver$PacketResponder` thread checks the mirrorError flag in 
line 1381. The `DataXceiver` thread is run concurrently. If 
`BlockReceiver$PacketResponder` finds mirrorError is false, it will try to read 
the ACK packet from downstream (the mirror, another datanode) in line 1384, 
which is a blocking call.

    However, there is a race condition. If the mirrorError flag set by the 
`handleMirrorOutError` method is noticed in line 1381, then the 
`BlockReceiver$PacketResponder` thread will not run the blocking network I/O 
call in line 1384. Instead, it will go to line 1481, and then 1568, and then 
1632. According to the code around line 1602, this ACK contains `Status.ERROR` 
which can warn the client. On the contrary, if the mirrorError flag is set 
after the timing of line 1381, the `BlockReceiver$PacketResponder` thread gets 
blocked in line 1384. In our scenario, a data block packet is not sent to the 
mirror datanode due to the IOException, so the corresponding ACK packet will 
not be sent by the mirror datanode either. Therefore, the 
`BlockReceiver$PacketResponder` thread will be blocked here for a long time.

*Fix*

    The key is to avoid the problematic concurrency between 
`BlockReceiver#receivePacket` and the ACK packet (from downstream mirror 
datanode) reading in `BlockReceiver$PacketResponder`. The simplest way to do it 
is that, every time `BlockReceiver#receivePacket` successfully forwards a 
packet to the downstream mirror datanode, we grant one chance for 
`BlockReceiver$PacketResponder` to check the mirrorError state and read the ACK 
with the blocking I/O call. It is reasonable because if the datanode has not 
sent the packet, it is impossible for the `BlockReceiver$PacketResponder` to 
get the corresponding ACK.

    The implementation only needs a semaphore in 
`BlockReceiver$PacketResponder`, and will not affect the other components.

*P.S.*

    We only talk about the reasoning on the symptom and the fix of this issue 
here. Actually this bug is also related to some behaviors in client side, but 
the reasoning would be a little complex. We have the complete analysis 
([https://docs.google.com/document/d/1Hq1qhbNFfS7y9zTNZ0VXsN3rxqExlMPaAqz4RfCurpE/edit?usp=sharing])
 for reference, which analyzes the packet receiving & sending threads of 
datanode & client and explain how the aforementioned injection can make these 4 
threads stuck in "deadlock".

*Reproduction*

    Start HDFS with the default configuration. Then execute a client (we used 
the command `bin/hdfs dfs -copyFromLocal ./foo.txt /1.txt` in the terminal). 
For each data block packet the client sends to the datanode, the datanode 
forwards it by line 588 in `BlockReceiver.java` 
([https://github.com/apache/hadoop/blob/rel/release-3.2.2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java#L588]).
 Inject one single IOException there.

    Most of the time, we don't have the concurrency condition to trigger this 
bug. Now the reliable way we use to reproduce this bug is setting 
`dfs.blocksize` as `1m` in `hdfs-site.xml`. Then run `bin/hdfs dfs 
-copyFromLocal ./foo.txt /1.txt` where `./foo.txt` is a file of 15MB (generated 
from `fallocate -l 15000000 foo.txt`). Then do the aforementioned injection in 
the timing of the 12th occurrence of 
[https://github.com/apache/hadoop/blob/rel/release-3.2.2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java#L747].

  was:
    When the datanode is receiving data block packets from a HDFS client and 
forwarding these packets to a mirror (another datanode) simultaneously, a 
single IOException in the datanode’s forwarding path can cause the client to 
get stuck for 1 min, without any logging. After 1 min, the client’s log shows a 
warning of EOFException and `Slow waitForAckedSeqno took 60106ms 
(threshold=30000ms)`.

    Normally the datanode will inform the client of this error state 
immediately, and then the client will resend the packets immediately. The whole 
process is very fast. After careful analyses, we find the above symptom is due 
to the lack of packet-level mirrorError state synchronization in 
BlockReceiver$PacketResponder: in some concurrency condition, the 
BlockReceiver$PacketResponder will hang for 1 min and then exit, without 
sending the error state to the client.

*Root Cause Analysis* 
{code:java}
//hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

class BlockReceiver implements Closeable {
  // ...

  private void handleMirrorOutError(IOException ioe) throws IOException {
    // ...
    if (Thread.interrupted()) {
      throw ioe;
    } else { // encounter an error while writing to mirror
      // continue to run even if can not write to mirror
      // notify client of the error
      // and wait for the client to shut down the pipeline
      mirrorError = true;                                            // line 461
    }
  }

  private int receivePacket() throws IOException {
    // read the next packet
    packetReceiver.receiveNextPacket(in);                            // line 528
    // ...
    boolean lastPacketInBlock = header.isLastPacketInBlock();        // line 551
    //First write the packet to the mirror:
    if (mirrorOut != null && !mirrorError) {
      try {
        // ...
        packetReceiver.mirrorPacketTo(mirrorOut);                    // line 588
        // ...
      } catch (IOException e) {
        handleMirrorOutError(e);                                     // line 604
      }
    }
    // ...
    return lastPacketInBlock?-1:len;                                 // line 849
  }

  void receiveBlock(...) throws IOException {
    // ...
    try {
      if (isClient && !isTransfer) {
        responder = new Daemon(datanode.threadGroup, 
            new PacketResponder(replyOut, mirrIn, downstreams));
        responder.start();                                           // line 968
      }

      while(receivePacket() >= 0){/*Receive until the last packet*/} // line 971

      // wait for all outstanding packet responses. And then
      // indicate responder to gracefully shutdown.
      // Mark that responder has been closed for future processing
      if (responder != null) {
        ((PacketResponder)responder.getRunnable()).close();          // line 977
        responderClosed = true;
      }
      // ...
    } catch (IOException ioe) {                                      // line 
1003
      // ...
    } finally {
      // ...
      if (!responderClosed) { // Data transfer was not complete.
        if (responder != null) {
          // ...
          responder.interrupt();                                     // line 
1046
        }
        // ...
      }
      if (responder != null) {
        try {
          responder.interrupt();                                     // line 
1053
          // ...
        } catch (InterruptedException e) {
          responder.interrupt();                                     // line 
1067
          // ...
        }
        // ...
      }
    }
  }
}
{code}
    In the `BlockReceiver.receivePacket` method, if the datanode fails to 
forward the packet to the mirror ( (line 588) due to an IOException, it is 
handled by line 604, which sets the mirrorError flag in line 461. According to 
the comments, the BlockReceiver keeps going with the mirrorError state, and the 
client would be notified of the error.

    However, jstack shows that the datanode gets stuck in the `DataXceiver` 
thread (receiving data block packets from client) and the 
`BlockReceiver$PacketResponder` thread (replying ACK packets to client). In 
particular, the `DataXceiver` thread gets stuck in the loop in line 971, which 
is further caused by blocking in line 528, meaning that the `lastPacketInBlock` 
packet has not arrived, and no more packets are coming in.
{code:java}
//hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

class BlockReceiver implements Closeable {
  // ...

  class PacketResponder implements Runnable, Closeable {
    // ...

    public void run() {
      // ...
      while (isRunning() && !lastPacketInBlock) {
        // ...
        try {
          // ...
          PipelineAck ack = new PipelineAck();
          // ...
          try {
            if (... && !mirrorError) {                               // line 
1381
              // ...
              // read an ack from downstream datanode
              ack.readFields(downstreamIn);                          // line 
1384
              // ...
            }
            // ...
          } catch (InterruptedException ine) {
            isInterrupted = true;                                    // line 
1434
          } catch (IOException ioe) {
            if (Thread.interrupted()) {
              isInterrupted = true;                                  // line 
1437
            } else ...
          }

          if (Thread.interrupted() || isInterrupted) {               // line 
1458
            // ...
            LOG.info(myString + ": Thread is interrupted.");
            running = false;
            continue;                                                // line 
1472
          }
          // ...
          sendAckUpstream(ack, expected, totalAckTimeNanos,          // line 
1481
            (pkt != null ? pkt.offsetInBlock : 0),
            PipelineAck.combineHeader(datanode.getECN(), myStatus));
          // ...
        } catch (IOException e) {
          // ...
        } catch (Throwable e) {
          // ...
        }
      }
      LOG.info(myString + " terminating");
    }

    private void sendAckUpstream(...) throws IOException {
      try {
        // ...

        try {
          if (!running) return;
          sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos,  // line 
1568
              offsetInBlock, myHeader);
        } finally {
          // ...
        }
      } catch (InterruptedException ie) {
        // ...
      }
    }

    private void sendAckUpstreamUnprotected(...) throws IOException {
      final int[] replies;
      if (ack == null) {
        // ...
        replies = new int[] { myHeader };
      } else if (mirrorError) { // ack read error
        int h = PipelineAck.combineHeader(datanode.getECN(), Status.SUCCESS);
        int h1 = PipelineAck.combineHeader(datanode.getECN(), Status.ERROR);
        replies = new int[] {h, h1};                                 // line 
1602
      } else {
        // ...
      }
      PipelineAck replyAck = new PipelineAck(seqno, replies,
          totalAckTimeNanos);
      // ...
      replyAck.write(upstreamOut);                                   // line 
1632
      // ...
    }

  }
}
{code}
    The `BlockReceiver$PacketResponder` thread checks the mirrorError flag in 
line 1381. The `DataXceiver` thread is run concurrently. If 
`BlockReceiver$PacketResponder` finds mirrorError is false, it will try to read 
the ACK packet from downstream (the mirror, another datanode) in line 1384, 
which is a blocking call.

    However, there is a race condition. If the mirrorError flag set by the 
`handleMirrorOutError` method is noticed in line 1381, then the 
`BlockReceiver$PacketResponder` thread will not run the blocking network I/O 
call in line 1384. Instead, it will go to line 1481, and then 1568, and then 
1632. According to the code around line 1602, this ACK contains `Status.ERROR` 
which can warn the client. On the contrary, if the mirrorError flag is set 
after the timing of line 1381, the `BlockReceiver$PacketResponder` thread gets 
blocked in line 1384. In our scenario, a data block packet is not sent to the 
mirror datanode due to the IOException, so the corresponding ACK packet will 
not be sent by the mirror datanode either. Therefore, the 
`BlockReceiver$PacketResponder` thread will be blocked here for a long time.

*Fix*

    The key is to avoid the problematic concurrency between 
`BlockReceiver#receivePacket` and the ACK packet (from downstream mirror 
datanode) reading in `BlockReceiver$PacketResponder`. The simplest way to do it 
is that, every time `BlockReceiver#receivePacket` successfully forwards a 
packet to the downstream mirror datanode, we grant one chance for 
`BlockReceiver$PacketResponder` to check the mirrorError state and read the ACK 
with the blocking I/O call. It is reasonable because if the datanode has not 
sent the packet, it is impossible for the `BlockReceiver$PacketResponder` to 
get the corresponding ACK.

    The implementation only needs a semaphore in 
`BlockReceiver$PacketResponder`, and will not affect the other components.

*Reproduction*

    Start HDFS with the default configuration. Then execute a client (we used 
the command `bin/hdfs dfs -copyFromLocal ./foo.txt /1.txt` in the terminal). 
For each data block packet the client sends to the datanode, the datanode 
forwards it by line 588 in `BlockReceiver.java` 
([https://github.com/apache/hadoop/blob/rel/release-3.2.2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java#L588]).
 Inject one single IOException there.

    Most of the time, we don't have the concurrency condition to trigger this 
bug. Now the reliable way we use to reproduce this bug is setting 
`dfs.blocksize` as `1m` in `hdfs-site.xml`. Then run `bin/hdfs dfs 
-copyFromLocal ./foo.txt /1.txt` where `./foo.txt` is a file of 15MB (generated 
from `fallocate -l 15000000 foo.txt`). Then do the aforementioned injection in 
the timing of the 12th occurrence of 
[https://github.com/apache/hadoop/blob/rel/release-3.2.2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java#L747].


> The lack of packet-level mirrorError state synchronization in 
> BlockReceiver$PacketResponder can cause the HDFS client to hang
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: HDFS-15925
>                 URL: https://issues.apache.org/jira/browse/HDFS-15925
>             Project: Hadoop HDFS
>          Issue Type: Bug
>          Components: datanode
>    Affects Versions: 3.2.2
>            Reporter: Haoze Wu
>            Priority: Critical
>
>     When the datanode is receiving data block packets from a HDFS client and 
> forwarding these packets to a mirror (another datanode) simultaneously, a 
> single IOException in the datanode’s forwarding path can cause the client to 
> get stuck for 1 min, without any logging. After 1 min, the client’s log shows 
> a warning of EOFException and `Slow waitForAckedSeqno took 60106ms 
> (threshold=30000ms)`.
>     Normally the datanode will inform the client of this error state 
> immediately, and then the client will resend the packets immediately. The 
> whole process is very fast. After careful analyses, we find the above symptom 
> is due to the lack of packet-level mirrorError state synchronization in 
> BlockReceiver$PacketResponder: in some concurrency condition, the 
> BlockReceiver$PacketResponder will hang for 1 min and then exit, without 
> sending the error state to the client.
> *Root Cause Analysis* 
> {code:java}
> //hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
> class BlockReceiver implements Closeable {
>   // ...
>   private void handleMirrorOutError(IOException ioe) throws IOException {
>     // ...
>     if (Thread.interrupted()) {
>       throw ioe;
>     } else { // encounter an error while writing to mirror
>       // continue to run even if can not write to mirror
>       // notify client of the error
>       // and wait for the client to shut down the pipeline
>       mirrorError = true;                                            // line 
> 461
>     }
>   }
>   private int receivePacket() throws IOException {
>     // read the next packet
>     packetReceiver.receiveNextPacket(in);                            // line 
> 528
>     // ...
>     boolean lastPacketInBlock = header.isLastPacketInBlock();        // line 
> 551
>     //First write the packet to the mirror:
>     if (mirrorOut != null && !mirrorError) {
>       try {
>         // ...
>         packetReceiver.mirrorPacketTo(mirrorOut);                    // line 
> 588
>         // ...
>       } catch (IOException e) {
>         handleMirrorOutError(e);                                     // line 
> 604
>       }
>     }
>     // ...
>     return lastPacketInBlock?-1:len;                                 // line 
> 849
>   }
>   void receiveBlock(...) throws IOException {
>     // ...
>     try {
>       if (isClient && !isTransfer) {
>         responder = new Daemon(datanode.threadGroup, 
>             new PacketResponder(replyOut, mirrIn, downstreams));
>         responder.start();                                           // line 
> 968
>       }
>       while(receivePacket() >= 0){/*Receive until the last packet*/} // line 
> 971
>       // wait for all outstanding packet responses. And then
>       // indicate responder to gracefully shutdown.
>       // Mark that responder has been closed for future processing
>       if (responder != null) {
>         ((PacketResponder)responder.getRunnable()).close();          // line 
> 977
>         responderClosed = true;
>       }
>       // ...
>     } catch (IOException ioe) {                                      // line 
> 1003
>       // ...
>     } finally {
>       // ...
>       if (!responderClosed) { // Data transfer was not complete.
>         if (responder != null) {
>           // ...
>           responder.interrupt();                                     // line 
> 1046
>         }
>         // ...
>       }
>       if (responder != null) {
>         try {
>           responder.interrupt();                                     // line 
> 1053
>           // ...
>         } catch (InterruptedException e) {
>           responder.interrupt();                                     // line 
> 1067
>           // ...
>         }
>         // ...
>       }
>     }
>   }
> }
> {code}
>     In the `BlockReceiver.receivePacket` method, if the datanode fails to 
> forward the packet to the mirror ( (line 588) due to an IOException, it is 
> handled by line 604, which sets the mirrorError flag in line 461. According 
> to the comments, the BlockReceiver keeps going with the mirrorError state, 
> and the client would be notified of the error.
>     However, jstack shows that the datanode gets stuck in the `DataXceiver` 
> thread (receiving data block packets from client) and the 
> `BlockReceiver$PacketResponder` thread (replying ACK packets to client). In 
> particular, the `DataXceiver` thread gets stuck in the loop in line 971, 
> which is further caused by blocking in line 528, meaning that the 
> `lastPacketInBlock` packet has not arrived, and no more packets are coming in.
> {code:java}
> //hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
> class BlockReceiver implements Closeable {
>   // ...
>   class PacketResponder implements Runnable, Closeable {
>     // ...
>     public void run() {
>       // ...
>       while (isRunning() && !lastPacketInBlock) {
>         // ...
>         try {
>           // ...
>           PipelineAck ack = new PipelineAck();
>           // ...
>           try {
>             if (... && !mirrorError) {                               // line 
> 1381
>               // ...
>               // read an ack from downstream datanode
>               ack.readFields(downstreamIn);                          // line 
> 1384
>               // ...
>             }
>             // ...
>           } catch (InterruptedException ine) {
>             isInterrupted = true;                                    // line 
> 1434
>           } catch (IOException ioe) {
>             if (Thread.interrupted()) {
>               isInterrupted = true;                                  // line 
> 1437
>             } else ...
>           }
>           if (Thread.interrupted() || isInterrupted) {               // line 
> 1458
>             // ...
>             LOG.info(myString + ": Thread is interrupted.");
>             running = false;
>             continue;                                                // line 
> 1472
>           }
>           // ...
>           sendAckUpstream(ack, expected, totalAckTimeNanos,          // line 
> 1481
>             (pkt != null ? pkt.offsetInBlock : 0),
>             PipelineAck.combineHeader(datanode.getECN(), myStatus));
>           // ...
>         } catch (IOException e) {
>           // ...
>         } catch (Throwable e) {
>           // ...
>         }
>       }
>       LOG.info(myString + " terminating");
>     }
>     private void sendAckUpstream(...) throws IOException {
>       try {
>         // ...
>         try {
>           if (!running) return;
>           sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos,  // line 
> 1568
>               offsetInBlock, myHeader);
>         } finally {
>           // ...
>         }
>       } catch (InterruptedException ie) {
>         // ...
>       }
>     }
>     private void sendAckUpstreamUnprotected(...) throws IOException {
>       final int[] replies;
>       if (ack == null) {
>         // ...
>         replies = new int[] { myHeader };
>       } else if (mirrorError) { // ack read error
>         int h = PipelineAck.combineHeader(datanode.getECN(), Status.SUCCESS);
>         int h1 = PipelineAck.combineHeader(datanode.getECN(), Status.ERROR);
>         replies = new int[] {h, h1};                                 // line 
> 1602
>       } else {
>         // ...
>       }
>       PipelineAck replyAck = new PipelineAck(seqno, replies,
>           totalAckTimeNanos);
>       // ...
>       replyAck.write(upstreamOut);                                   // line 
> 1632
>       // ...
>     }
>   }
> }
> {code}
>     The `BlockReceiver$PacketResponder` thread checks the mirrorError flag in 
> line 1381. The `DataXceiver` thread is run concurrently. If 
> `BlockReceiver$PacketResponder` finds mirrorError is false, it will try to 
> read the ACK packet from downstream (the mirror, another datanode) in line 
> 1384, which is a blocking call.
>     However, there is a race condition. If the mirrorError flag set by the 
> `handleMirrorOutError` method is noticed in line 1381, then the 
> `BlockReceiver$PacketResponder` thread will not run the blocking network I/O 
> call in line 1384. Instead, it will go to line 1481, and then 1568, and then 
> 1632. According to the code around line 1602, this ACK contains 
> `Status.ERROR` which can warn the client. On the contrary, if the mirrorError 
> flag is set after the timing of line 1381, the 
> `BlockReceiver$PacketResponder` thread gets blocked in line 1384. In our 
> scenario, a data block packet is not sent to the mirror datanode due to the 
> IOException, so the corresponding ACK packet will not be sent by the mirror 
> datanode either. Therefore, the `BlockReceiver$PacketResponder` thread will 
> be blocked here for a long time.
> *Fix*
>     The key is to avoid the problematic concurrency between 
> `BlockReceiver#receivePacket` and the ACK packet (from downstream mirror 
> datanode) reading in `BlockReceiver$PacketResponder`. The simplest way to do 
> it is that, every time `BlockReceiver#receivePacket` successfully forwards a 
> packet to the downstream mirror datanode, we grant one chance for 
> `BlockReceiver$PacketResponder` to check the mirrorError state and read the 
> ACK with the blocking I/O call. It is reasonable because if the datanode has 
> not sent the packet, it is impossible for the `BlockReceiver$PacketResponder` 
> to get the corresponding ACK.
>     The implementation only needs a semaphore in 
> `BlockReceiver$PacketResponder`, and will not affect the other components.
> *P.S.*
>     We only talk about the reasoning on the symptom and the fix of this issue 
> here. Actually this bug is also related to some behaviors in client side, but 
> the reasoning would be a little complex. We have the complete analysis 
> ([https://docs.google.com/document/d/1Hq1qhbNFfS7y9zTNZ0VXsN3rxqExlMPaAqz4RfCurpE/edit?usp=sharing])
>  for reference, which analyzes the packet receiving & sending threads of 
> datanode & client and explain how the aforementioned injection can make these 
> 4 threads stuck in "deadlock".
> *Reproduction*
>     Start HDFS with the default configuration. Then execute a client (we used 
> the command `bin/hdfs dfs -copyFromLocal ./foo.txt /1.txt` in the terminal). 
> For each data block packet the client sends to the datanode, the datanode 
> forwards it by line 588 in `BlockReceiver.java` 
> ([https://github.com/apache/hadoop/blob/rel/release-3.2.2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java#L588]).
>  Inject one single IOException there.
>     Most of the time, we don't have the concurrency condition to trigger this 
> bug. Now the reliable way we use to reproduce this bug is setting 
> `dfs.blocksize` as `1m` in `hdfs-site.xml`. Then run `bin/hdfs dfs 
> -copyFromLocal ./foo.txt /1.txt` where `./foo.txt` is a file of 15MB 
> (generated from `fallocate -l 15000000 foo.txt`). Then do the aforementioned 
> injection in the timing of the 12th occurrence of 
> [https://github.com/apache/hadoop/blob/rel/release-3.2.2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java#L747].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to