[ 
https://issues.apache.org/jira/browse/HBASE-26679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chenglei updated HBASE-26679:
-----------------------------
    Description: 
Consider there are three dataNodes: dn1,dn2,and dn3, and we write some data to 
{{FanOutOneBlockAsyncDFSOutput}} and then flush it, there are one {{Callback}} 
in {{FanOutOneBlockAsyncDFSOutput.waitingAckQueue}}.  If the ack from dn1 
arrives firstly and triggers Netty to invoke 
{{FanOutOneBlockAsyncDFSOutput.completed}} with dn1's channel, then in 
{{FanOutOneBlockAsyncDFSOutput.completed}}, dn1's channel is removed from 
{{Callback.unfinishedReplicas}}. 
But dn2 and dn3 respond slowly, before dn2 and dn3 sending ack , dn1 is shut 
down or have a exception, so {{FanOutOneBlockAsyncDFSOutput.failed}} is 
triggered by Netty with dn1's channel, and because the 
{{Callback.unfinishedReplicas}} does not contain dn1's channel,
the {{Callback}} is skipped in {{FanOutOneBlockAsyncDFSOutput.failed}} ,just as 
following line250, and at line 245, {{FanOutOneBlockAsyncDFSOutput.state}} is 
set to 
{{State.BROKEN}}
{code:java}
233  private synchronized void failed(Channel channel, Supplier<Throwable> 
errorSupplier) {
234     if (state == State.BROKEN || state == State.CLOSED) {
235         return;
236      }
     ....
244    // disable further write, and fail all pending ack.
245    state = State.BROKEN;
246    Throwable error = errorSupplier.get();
247    for (Iterator<Callback> iter = waitingAckQueue.iterator(); 
iter.hasNext();) {
248      Callback c = iter.next();
249      // find the first sync request which we have not acked yet and fail 
all the request after it.
250      if (!c.unfinishedReplicas.contains(channel.id())) {
251        continue;
252      }
253      for (;;) {
254        c.future.completeExceptionally(error);
255        if (!iter.hasNext()) {
256          break;
257        }
258        c = iter.next();
259      }
260    break;
261    }
262   datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
263  }
{code}
At the end of above method, dn1,dn2 and dn3 are all closed, so the 
{{FanOutOneBlockAsyncDFSOutput.failed}} is triggered again by dn2 and dn3, but 
at the above line 234, because {{FanOutOneBlockAsyncDFSOutput.state}}  is 
already {{State.BROKEN}}, the whole  {{FanOutOneBlockAsyncDFSOutput.failed}}  
is skipped. So the wait on the future returned by 
{{FanOutOneBlockAsyncDFSOutput.flush}} would stuck for ever.

When we roll the wal, we would create a new {{FanOutOneBlockAsyncDFSOutput}} 
and a new {{AsyncProtobufLogWriter}}, in  {{AsyncProtobufLogWriter.init}} we 
woud write wal header to {{FanOutOneBlockAsyncDFSOutput}} and wait it to 
complete. If we run into this situation, the roll would stuck forever.

I have simulate this case in the PR, and my fix is even through the  
{{FanOutOneBlockAsyncDFSOutput.state}}  is already {{State.BROKEN}}, we would 
still try to trigger {{Callback.future}}

  was:
Consider there are three dataNodes: dn1,dn2,and dn3, and we write some data to 
{{FanOutOneBlockAsyncDFSOutput}} and then flush it, there are one {{Callback}} 
in {{FanOutOneBlockAsyncDFSOutput.waitingAckQueue}}.  If the ack from dn1 
arrives firstly and triggers Netty to invoke 
{{FanOutOneBlockAsyncDFSOutput.completed}} with dn1's channel, in 
{{FanOutOneBlockAsyncDFSOutput.completed}}, dn1's channel is removed from 
{{Callback.unfinishedReplicas}}. 
But dn2 and dn3 respond slowly, before dn2 and dn3 sending ack , dn1 is dead, 
so {{FanOutOneBlockAsyncDFSOutput.failed}} is triggered by Netty with dn1's 
channel, and because the {{Callback.unfinishedReplicas}} does not contain dn1's 
channel,
the {{Callback}} is skipped in {{FanOutOneBlockAsyncDFSOutput.failed}} ,just as 
following line250, and at line 245, {{FanOutOneBlockAsyncDFSOutput.state}} is 
set to 
{{State.BROKEN}}
{code:java}
233  private synchronized void failed(Channel channel, Supplier<Throwable> 
errorSupplier) {
234     if (state == State.BROKEN || state == State.CLOSED) {
235         return;
236      }
     ....
244    // disable further write, and fail all pending ack.
245    state = State.BROKEN;
246    Throwable error = errorSupplier.get();
247    for (Iterator<Callback> iter = waitingAckQueue.iterator(); 
iter.hasNext();) {
248      Callback c = iter.next();
249      // find the first sync request which we have not acked yet and fail 
all the request after it.
250      if (!c.unfinishedReplicas.contains(channel.id())) {
251        continue;
252      }
253      for (;;) {
254        c.future.completeExceptionally(error);
255        if (!iter.hasNext()) {
256          break;
257        }
258        c = iter.next();
259      }
260    break;
261    }
262   datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
263  }
{code}
At the end of above method, dn1,dn2 and dn3 are all closed, so the 
{{FanOutOneBlockAsyncDFSOutput.failed}} is triggered again by dn2 and dn3, but 
at the above line 234, because {{FanOutOneBlockAsyncDFSOutput.state}}  is 
already {{State.BROKEN}}, the whole  {{FanOutOneBlockAsyncDFSOutput.failed}}  
is skipped. So the wait on the future returned by 
{{FanOutOneBlockAsyncDFSOutput.flush}} would stuck for ever.

When we roll the wal, we would create a new {{FanOutOneBlockAsyncDFSOutput}} 
and a new {{AsyncProtobufLogWriter}}, in  {{AsyncProtobufLogWriter.init}} we 
woud write wal header to {{FanOutOneBlockAsyncDFSOutput}} and wait it to 
complete. If we run into this situation, the roll would stuck forever.

I have simulate this case in the PR, and my fix is even through the  
{{FanOutOneBlockAsyncDFSOutput.state}}  is already {{State.BROKEN}}, we would 
still try to trigger {{Callback.future}}


> Wait on the future returned by FanOutOneBlockAsyncDFSOutput.flush would stuck
> -----------------------------------------------------------------------------
>
>                 Key: HBASE-26679
>                 URL: https://issues.apache.org/jira/browse/HBASE-26679
>             Project: HBase
>          Issue Type: Bug
>          Components: wal
>    Affects Versions: 3.0.0-alpha-2, 2.4.9
>            Reporter: chenglei
>            Priority: Major
>
> Consider there are three dataNodes: dn1,dn2,and dn3, and we write some data 
> to {{FanOutOneBlockAsyncDFSOutput}} and then flush it, there are one 
> {{Callback}} in {{FanOutOneBlockAsyncDFSOutput.waitingAckQueue}}.  If the ack 
> from dn1 arrives firstly and triggers Netty to invoke 
> {{FanOutOneBlockAsyncDFSOutput.completed}} with dn1's channel, then in 
> {{FanOutOneBlockAsyncDFSOutput.completed}}, dn1's channel is removed from 
> {{Callback.unfinishedReplicas}}. 
> But dn2 and dn3 respond slowly, before dn2 and dn3 sending ack , dn1 is shut 
> down or have a exception, so {{FanOutOneBlockAsyncDFSOutput.failed}} is 
> triggered by Netty with dn1's channel, and because the 
> {{Callback.unfinishedReplicas}} does not contain dn1's channel,
> the {{Callback}} is skipped in {{FanOutOneBlockAsyncDFSOutput.failed}} ,just 
> as following line250, and at line 245, {{FanOutOneBlockAsyncDFSOutput.state}} 
> is set to 
> {{State.BROKEN}}
> {code:java}
> 233  private synchronized void failed(Channel channel, Supplier<Throwable> 
> errorSupplier) {
> 234     if (state == State.BROKEN || state == State.CLOSED) {
> 235         return;
> 236      }
>      ....
> 244    // disable further write, and fail all pending ack.
> 245    state = State.BROKEN;
> 246    Throwable error = errorSupplier.get();
> 247    for (Iterator<Callback> iter = waitingAckQueue.iterator(); 
> iter.hasNext();) {
> 248      Callback c = iter.next();
> 249      // find the first sync request which we have not acked yet and fail 
> all the request after it.
> 250      if (!c.unfinishedReplicas.contains(channel.id())) {
> 251        continue;
> 252      }
> 253      for (;;) {
> 254        c.future.completeExceptionally(error);
> 255        if (!iter.hasNext()) {
> 256          break;
> 257        }
> 258        c = iter.next();
> 259      }
> 260    break;
> 261    }
> 262   datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
> 263  }
> {code}
> At the end of above method, dn1,dn2 and dn3 are all closed, so the 
> {{FanOutOneBlockAsyncDFSOutput.failed}} is triggered again by dn2 and dn3, 
> but at the above line 234, because {{FanOutOneBlockAsyncDFSOutput.state}}  is 
> already {{State.BROKEN}}, the whole  {{FanOutOneBlockAsyncDFSOutput.failed}}  
> is skipped. So the wait on the future returned by 
> {{FanOutOneBlockAsyncDFSOutput.flush}} would stuck for ever.
> When we roll the wal, we would create a new {{FanOutOneBlockAsyncDFSOutput}} 
> and a new {{AsyncProtobufLogWriter}}, in  {{AsyncProtobufLogWriter.init}} we 
> woud write wal header to {{FanOutOneBlockAsyncDFSOutput}} and wait it to 
> complete. If we run into this situation, the roll would stuck forever.
> I have simulate this case in the PR, and my fix is even through the  
> {{FanOutOneBlockAsyncDFSOutput.state}}  is already {{State.BROKEN}}, we would 
> still try to trigger {{Callback.future}}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to