Author: rangadi
Date: Tue Nov 11 11:05:36 2008
New Revision: 713112
URL: http://svn.apache.org/viewvc?rev=713112&view=rev
Log:
HADOOP-4552. Fix a deadlock in RPC server. (Raghu Angadi)
Modified:
hadoop/core/branches/branch-0.19/CHANGES.txt
hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/ipc/Server.java
Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=713112&r1=713111&r2=713112&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Tue Nov 11 11:05:36 2008
@@ -971,6 +971,8 @@
HADOOP-4595. Fixes two race conditions - one to do with updating free slot
count,
and another to do with starting the MapEventsFetcher thread. (ddas)
+ HADOOP-4552. Fix a deadlock in RPC server. (Raghu Angadi)
+
Release 0.18.3 - Unreleased
BUG FIXES
Modified:
hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/ipc/Server.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/ipc/Server.java?rev=713112&r1=713111&r2=713112&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/ipc/Server.java
(original)
+++ hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/ipc/Server.java
Tue Nov 11 11:05:36 2008
@@ -40,6 +40,7 @@
import java.net.SocketException;
import java.net.UnknownHostException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -475,17 +476,28 @@
// long time, discard them.
//
LOG.debug("Checking for old call responses.");
+ ArrayList<Call> calls;
+
+ // get the list of channels from list of keys.
synchronized (writeSelector.keys()) {
+ calls = new ArrayList<Call>(writeSelector.keys().size());
iter = writeSelector.keys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
- try {
- doPurge(key, now);
- } catch (IOException e) {
- LOG.warn("Error in purging old calls " + e);
+ Call call = (Call)key.attachment();
+ if (call != null && key.channel() == call.connection.channel) {
+ calls.add(call);
}
}
}
+
+ for(Call call : calls) {
+ try {
+ doPurge(call, now);
+ } catch (IOException e) {
+ LOG.warn("Error in purging old calls " + e);
+ }
+ }
} catch (OutOfMemoryError e) {
//
// we can run out of memory if we have too many threads
@@ -531,15 +543,7 @@
// Remove calls that have been pending in the responseQueue
// for a long time.
//
- private void doPurge(SelectionKey key, long now) throws IOException {
- Call call = (Call)key.attachment();
- if (call == null) {
- return;
- }
- if (key.channel() != call.connection.channel) {
- LOG.info("doPurge: bad channel");
- return;
- }
+ private void doPurge(Call call, long now) throws IOException {
LinkedList<Call> responseQueue = call.connection.responseQueue;
synchronized (responseQueue) {
Iterator<Call> iter = responseQueue.listIterator(0);