[
https://issues.apache.org/jira/browse/HBASE-26679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
chenglei updated HBASE-26679:
-----------------------------
Description:
Consider there are three dataNodes: dn1,dn2,and dn3, and we write some data to
{{FanOutOneBlockAsyncDFSOutput}} and then flush it, there are one {{Callback}}
in {{FanOutOneBlockAsyncDFSOutput.waitingAckQueue}}. If the ack from dn1
arrives firstly and triggers Netty to invoke
{{FanOutOneBlockAsyncDFSOutput.completed}} with dn1's channel, in
{{FanOutOneBlockAsyncDFSOutput.completed}}, dn1's channel is removed from
{{Callback.unfinishedReplicas}}.
But dn2 and dn3 respond slowly, before dn2 and dn3 sending ack , dn1 is dead,
so {{FanOutOneBlockAsyncDFSOutput.failed}} is triggered by Netty with dn1's
channel, and because the {{Callback.unfinishedReplicas}} does not contain dn1's
channel,
the {{Callback}} is skipped in {{FanOutOneBlockAsyncDFSOutput.failed}} ,just as
following line250, and at line 245, {{FanOutOneBlockAsyncDFSOutput.state}} is
set to
{{State.BROKEN}}
{code:java}
233 private synchronized void failed(Channel channel, Supplier<Throwable>
errorSupplier) {
234 if (state == State.BROKEN || state == State.CLOSED) {
235 return;
236 }
....
244 // disable further write, and fail all pending ack.
245 state = State.BROKEN;
246 Throwable error = errorSupplier.get();
247 for (Iterator<Callback> iter = waitingAckQueue.iterator();
iter.hasNext();) {
248 Callback c = iter.next();
249 // find the first sync request which we have not acked yet and fail
all the request after it.
250 if (!c.unfinishedReplicas.contains(channel.id())) {
251 continue;
252 }
253 for (;;) {
254 c.future.completeExceptionally(error);
255 if (!iter.hasNext()) {
256 break;
257 }
258 c = iter.next();
259 }
260 break;
261 }
262 datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
263 }
{code}
At the end of above method, dn1,dn2 and dn3 are all closed, so the
{{FanOutOneBlockAsyncDFSOutput.failed}} is triggered again by dn2 and dn3, but
at the above line 234, because {{FanOutOneBlockAsyncDFSOutput.state}} is
already {{State.BROKEN}}, the whole {{FanOutOneBlockAsyncDFSOutput.failed}}
is skipped. So the wait on the future returned by
{{FanOutOneBlockAsyncDFSOutput.flush}} would stuck for ever.
When we roll the wal, we would create a new {{FanOutOneBlockAsyncDFSOutput}}
and a new {{AsyncProtobufLogWriter}}, in {{AsyncProtobufLogWriter.init}} we
woud write wal header to {{FanOutOneBlockAsyncDFSOutput}} and wait it to
complete. If we run into this situation, the roll would stuck forever.
I have simulate this case in the PR, and my fix is even through the
{{FanOutOneBlockAsyncDFSOutput.state}} is already {{State.BROKEN}}, we would
still try to trigger {{Callback.future}}
was:
Consider there are three dataNodes: dn1,dn2,and dn3 and write some data to
{{FanOutOneBlockAsyncDFSOutput}} and then flush it, there are one {{Callback}}
in {{FanOutOneBlockAsyncDFSOutput.waitingAckQueue}}. If the ack from dn1
arrives firstly and triggers Netty to invoke
{{FanOutOneBlockAsyncDFSOutput.completed}} with dn1's channel, in
{{FanOutOneBlockAsyncDFSOutput.completed}}, dn1's channel is removed from
{{Callback.unfinishedReplicas}}.
But dn2 and dn3 respond slowly, before dn2 and dn3 sending ack , dn1 is dead,
so {{FanOutOneBlockAsyncDFSOutput.failed}} is triggered by Netty with dn1's
channel, and because the {{Callback.unfinishedReplicas}} does not contain dn1's
channel,
the {{Callback}} is skipped in {{FanOutOneBlockAsyncDFSOutput.failed}} ,just as
following line250, and at line 245, {{FanOutOneBlockAsyncDFSOutput.state}} is
set to
{{State.BROKEN}}
{code:java}
233 private synchronized void failed(Channel channel, Supplier<Throwable>
errorSupplier) {
234 if (state == State.BROKEN || state == State.CLOSED) {
235 return;
236 }
....
244 // disable further write, and fail all pending ack.
245 state = State.BROKEN;
246 Throwable error = errorSupplier.get();
247 for (Iterator<Callback> iter = waitingAckQueue.iterator();
iter.hasNext();) {
248 Callback c = iter.next();
249 // find the first sync request which we have not acked yet and fail
all the request after it.
250 if (!c.unfinishedReplicas.contains(channel.id())) {
251 continue;
252 }
253 for (;;) {
254 c.future.completeExceptionally(error);
255 if (!iter.hasNext()) {
256 break;
257 }
258 c = iter.next();
259 }
260 break;
261 }
262 datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
263 }
{code}
At the end of above method, dn1,dn2 and dn3 are all closed, so the
{{FanOutOneBlockAsyncDFSOutput.failed}} is triggered again by dn2 and dn3, but
at the above line 234, because {{FanOutOneBlockAsyncDFSOutput.state}} is
already {{State.BROKEN}}, the whole {{FanOutOneBlockAsyncDFSOutput.failed}}
is skipped. So the wait on the future returned by
{{FanOutOneBlockAsyncDFSOutput.flush}} would stuck for ever.
When we roll the wal, we would create a new {{FanOutOneBlockAsyncDFSOutput}}
and a new {{AsyncProtobufLogWriter}}, in {{AsyncProtobufLogWriter.init}} we
woud write wal header to {{FanOutOneBlockAsyncDFSOutput}} and wait it to
complete. If we run into this situation, the roll would stuck forever.
I have simulate this case in the PR, and my fix is even through the
{{FanOutOneBlockAsyncDFSOutput.state}} is already {{State.BROKEN}}, we would
still try to trigger {{Callback.future}}
> Wait on the future returned by FanOutOneBlockAsyncDFSOutput.flush would stuck
> -----------------------------------------------------------------------------
>
> Key: HBASE-26679
> URL: https://issues.apache.org/jira/browse/HBASE-26679
> Project: HBase
> Issue Type: Bug
> Components: wal
> Affects Versions: 3.0.0-alpha-2, 2.4.9
> Reporter: chenglei
> Priority: Major
>
> Consider there are three dataNodes: dn1,dn2,and dn3, and we write some data
> to {{FanOutOneBlockAsyncDFSOutput}} and then flush it, there are one
> {{Callback}} in {{FanOutOneBlockAsyncDFSOutput.waitingAckQueue}}. If the ack
> from dn1 arrives firstly and triggers Netty to invoke
> {{FanOutOneBlockAsyncDFSOutput.completed}} with dn1's channel, in
> {{FanOutOneBlockAsyncDFSOutput.completed}}, dn1's channel is removed from
> {{Callback.unfinishedReplicas}}.
> But dn2 and dn3 respond slowly, before dn2 and dn3 sending ack , dn1 is dead,
> so {{FanOutOneBlockAsyncDFSOutput.failed}} is triggered by Netty with dn1's
> channel, and because the {{Callback.unfinishedReplicas}} does not contain
> dn1's channel,
> the {{Callback}} is skipped in {{FanOutOneBlockAsyncDFSOutput.failed}} ,just
> as following line250, and at line 245, {{FanOutOneBlockAsyncDFSOutput.state}}
> is set to
> {{State.BROKEN}}
> {code:java}
> 233 private synchronized void failed(Channel channel, Supplier<Throwable>
> errorSupplier) {
> 234 if (state == State.BROKEN || state == State.CLOSED) {
> 235 return;
> 236 }
> ....
> 244 // disable further write, and fail all pending ack.
> 245 state = State.BROKEN;
> 246 Throwable error = errorSupplier.get();
> 247 for (Iterator<Callback> iter = waitingAckQueue.iterator();
> iter.hasNext();) {
> 248 Callback c = iter.next();
> 249 // find the first sync request which we have not acked yet and fail
> all the request after it.
> 250 if (!c.unfinishedReplicas.contains(channel.id())) {
> 251 continue;
> 252 }
> 253 for (;;) {
> 254 c.future.completeExceptionally(error);
> 255 if (!iter.hasNext()) {
> 256 break;
> 257 }
> 258 c = iter.next();
> 259 }
> 260 break;
> 261 }
> 262 datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
> 263 }
> {code}
> At the end of above method, dn1,dn2 and dn3 are all closed, so the
> {{FanOutOneBlockAsyncDFSOutput.failed}} is triggered again by dn2 and dn3,
> but at the above line 234, because {{FanOutOneBlockAsyncDFSOutput.state}} is
> already {{State.BROKEN}}, the whole {{FanOutOneBlockAsyncDFSOutput.failed}}
> is skipped. So the wait on the future returned by
> {{FanOutOneBlockAsyncDFSOutput.flush}} would stuck for ever.
> When we roll the wal, we would create a new {{FanOutOneBlockAsyncDFSOutput}}
> and a new {{AsyncProtobufLogWriter}}, in {{AsyncProtobufLogWriter.init}} we
> woud write wal header to {{FanOutOneBlockAsyncDFSOutput}} and wait it to
> complete. If we run into this situation, the roll would stuck forever.
> I have simulate this case in the PR, and my fix is even through the
> {{FanOutOneBlockAsyncDFSOutput.state}} is already {{State.BROKEN}}, we would
> still try to trigger {{Callback.future}}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)