[
https://issues.apache.org/jira/browse/HBASE-26679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Andrew Kyle Purtell updated HBASE-26679:
----------------------------------------
Fix Version/s: 2.5.0
3.0.0-alpha-3
2.4.10
> Wait on the future returned by FanOutOneBlockAsyncDFSOutput.flush would stuck
> -----------------------------------------------------------------------------
>
> Key: HBASE-26679
> URL: https://issues.apache.org/jira/browse/HBASE-26679
> Project: HBase
> Issue Type: Bug
> Components: wal
> Affects Versions: 3.0.0-alpha-2, 2.4.9
> Reporter: chenglei
> Assignee: chenglei
> Priority: Major
> Fix For: 2.5.0, 3.0.0-alpha-3, 2.4.10
>
>
> Consider there are three dataNodes: dn1,dn2,and dn3, and we write some data
> to {{FanOutOneBlockAsyncDFSOutput}} and then flush it, there are one
> {{Callback}} in {{FanOutOneBlockAsyncDFSOutput.waitingAckQueue}}. If the ack
> from dn1 arrives firstly and triggers Netty to invoke
> {{FanOutOneBlockAsyncDFSOutput.completed}} with dn1's channel, then in
> {{FanOutOneBlockAsyncDFSOutput.completed}}, dn1's channel is removed from
> {{Callback.unfinishedReplicas}}.
> But dn2 and dn3 respond slowly, before dn2 and dn3 sending ack , dn1 is shut
> down or have a exception, so {{FanOutOneBlockAsyncDFSOutput.failed}} is
> triggered by Netty with dn1's channel, and because the
> {{Callback.unfinishedReplicas}} does not contain dn1's channel,the
> {{Callback}} is skipped in {{FanOutOneBlockAsyncDFSOutput.failed}} method,
> just as following line250, and at line 245,
> {{FanOutOneBlockAsyncDFSOutput.state}} is set to {{State.BROKEN}}.
> {code:java}
> 233 private synchronized void failed(Channel channel, Supplier<Throwable>
> errorSupplier) {
> 234 if (state == State.BROKEN || state == State.CLOSED) {
> 235 return;
> 236 }
> ....
> 244 // disable further write, and fail all pending ack.
> 245 state = State.BROKEN;
> 246 Throwable error = errorSupplier.get();
> 247 for (Iterator<Callback> iter = waitingAckQueue.iterator();
> iter.hasNext();) {
> 248 Callback c = iter.next();
> 249 // find the first sync request which we have not acked yet and fail
> all the request after it.
> 250 if (!c.unfinishedReplicas.contains(channel.id())) {
> 251 continue;
> 252 }
> 253 for (;;) {
> 254 c.future.completeExceptionally(error);
> 255 if (!iter.hasNext()) {
> 256 break;
> 257 }
> 258 c = iter.next();
> 259 }
> 260 break;
> 261 }
> 262 datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
> 263 }
> {code}
> At the end of above method in line 262, dn1,dn2 and dn3 are all closed, so
> the {{FanOutOneBlockAsyncDFSOutput.failed}} is triggered again by dn2 and
> dn3, but at the above line 234, because
> {{FanOutOneBlockAsyncDFSOutput.state}} is already {{State.BROKEN}}, the
> whole {{FanOutOneBlockAsyncDFSOutput.failed}} is skipped. So wait on the
> future returned by {{FanOutOneBlockAsyncDFSOutput.flush}} would stuck for
> ever.
> When we roll the wal, we would create a new {{FanOutOneBlockAsyncDFSOutput}}
> and a new {{AsyncProtobufLogWriter}}, in {{AsyncProtobufLogWriter.init}} we
> write wal header to {{FanOutOneBlockAsyncDFSOutput}} and wait it to complete.
> If we run into this situation, the roll would stuck forever.
> I have simulate this case in the PR, and my fix is even through the
> {{FanOutOneBlockAsyncDFSOutput.state}} is already {{State.BROKEN}}, we would
> still try to trigger {{Callback.future}}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)