Hari Shreedharan created FLUME-1748:
---------------------------------------
Summary: HDFS Sink can end up in a bad state if the Callables are
interrupted.
Key: FLUME-1748
URL: https://issues.apache.org/jira/browse/FLUME-1748
Project: Flume
Issue Type: Bug
Reporter: Hari Shreedharan
If one or more HDFS Sink writes/flush/close end up taking too long, the thread
is interrupted by the future.cancel(true) call. The Hadoop RPC client re-sets
the interrupt flag on a thread which is interrupted. Also if the thread is
interrupted after the RPC call, but before the call() method returns, the
interrupt flag stays on the thread. A future HDFS file open call would lead to
an exception of this sort:
{code}
[SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.sink.hdfs.HDFSEventSink.process:457) - HDFS IO error
java.io.IOException: Failed on local exception:
java.nio.channels.ClosedByInterruptException; Host Details : local host is:
"random.example.com/10.20.81.108"; destination host is:
"random2.example.com":8020;
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:759)
at org.apache.hadoop.ipc.Client.call(Client.java:1164)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
at $Proxy9.create(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
at $Proxy9.create(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192)
at
org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298)
at
org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173)
at
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272)
at
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261)
at
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674)
at
org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60)
at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:210)
at
org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53)
at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172)
at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170)
at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170)
at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:365)
at
org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:730)
at
org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:728)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.nio.channels.ClosedByInterruptException
at
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:511)
at
org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:193)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:523)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:488)
at
org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:476)
at
org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:570)
at org.apache.hadoop.ipc.Client$Connection.access$1700(Client.java:220)
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1213)
at org.apache.hadoop.ipc.Client.call(Client.java:1140)
... 36 more
{code}
The relevant code that re-sets the interrupt flag is in
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?revision=1410024&view=markup
The method is Client#call(RPC.RpcKind, Writable,ConnectionId). Note that the
InterruptedException is caught and the interrupt flag is re-set.
{code}
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId) throws InterruptedException, IOException {
Call call = new Call(rpcKind, rpcRequest);
Connection connection = getConnection(remoteId, call);
connection.sendParam(call); // send the parameter
boolean interrupted = false;
synchronized (call) {
while (!call.done) {
try {
call.wait(); // wait for the result
} catch (InterruptedException ie) {
// save the fact that we were interrupted
interrupted = true;
}
}
if (interrupted) {
// set the interrupt flag now that we are done waiting
Thread.currentThread().interrupt();
}
if (call.error != null) {
if (call.error instanceof RemoteException) {
call.error.fillInStackTrace();
throw call.error;
} else { // local exception
InetSocketAddress address = connection.getRemoteAddress();
throw NetUtils.wrapException(address.getHostName(),
address.getPort(),
NetUtils.getHostname(),
0,
call.error);
}
} else {
return call.getRpcResult();
}
}
}
{code}
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira