Update of /cvsroot/freenet/freenet/src/freenet/transport
In directory sc8-pr-cvs1:/tmp/cvs-serv25857/src/freenet/transport
Modified Files:
Tag: stable
AbstractSelectorLoop.java ListenSelectorLoop.java
NIOReader.java ReadSelectorLoop.java
ThrottledSelectorLoop.java WriteSelectorLoop.java
tcpAddress.java tcpConnection.java tcpTransport.java
Log Message:
5029: Merge from unstable after months of work. MASSIVE changes.
Highlights:
* Next Generation Routing, massive related changes
* Major changes to handling of messages and connections (PeerHandler and related
changes)
* Even more non-blocking I/O
* Documentation improvements
* Lots of new diagnostics and config options
* Lots of bug fixes and performance tweaking
* Probably lots of new bugs too!
Index: AbstractSelectorLoop.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/transport/AbstractSelectorLoop.java,v
retrieving revision 1.15.2.17
retrieving revision 1.15.2.18
diff -u -w -r1.15.2.17 -r1.15.2.18
--- AbstractSelectorLoop.java 23 Aug 2003 12:04:43 -0000 1.15.2.17
+++ AbstractSelectorLoop.java 28 Oct 2003 20:20:48 -0000 1.15.2.18
@@ -30,6 +30,8 @@
private static LinkedList closeQueue = null;
private static LinkedList preCloseQueue = null;
+ private static Object preCloseQueueObject = new Object(); //necessary cause we
change preCloseQueue
+
private static boolean isWindows =
(System.getProperty("os.name").toLowerCase().indexOf("windows") != -1);
private static CloseThread closeThread= null;
@@ -160,12 +162,14 @@
public Connection conn;
public Object attachment;
public SocketChannel sc;
+ public long timeStamp; //debugging. Can be removed eventually
public ClosePair(Connection conn, Object attachment,
SocketChannel sc) {
this.conn = conn;
this.attachment = attachment;
this.sc = sc;
+ timeStamp = System.currentTimeMillis();
}
public String toString() {
@@ -224,6 +228,9 @@
}
protected abstract int myKeyOps();
+ public SelectionKey keyFor(SelectableChannel ch) {
+ return ch.keyFor(sel);
+ }
/**
* register/unregister waiters in the queues
@@ -276,6 +283,10 @@
} catch (ClassCastException e) {
// Not an NIOCallback
continue;
+ } catch (CancelledKeyException e) {
+ //do nothing?
+ if (logDebug)
+
Core.logger.log(this,"cought "+e,Logger.DEBUG);
}
} else {
notRegistered.add(current);
@@ -291,20 +302,39 @@
//then remove
synchronized(unregisterWaiters) {
while (unregisterWaiters.size() >0) {
- ChannelAttachmentPair current =
(ChannelAttachmentPair)unregisterWaiters.removeFirst();
- if (current.channel!=null) //we have a channel
- current.channel.keyFor(sel).cancel();
+ ChannelAttachmentPair current;
+ try {
+ current =
+
(ChannelAttachmentPair)unregisterWaiters.removeFirst();
+ } catch (NoSuchElementException e) {
+ Core.logger.log(this, "Parallel removal of elements in
"+this+
+ "?: "+e, e,
Logger.ERROR);
+ break;
+ }
+ if (current.channel!=null) {
+ //we have a channel
+ SelectionKey k = current.channel.keyFor(sel);
+ if(k != null) k.cancel();
+ }
// Not used by WSL, so we don't need to tell it
else if (current.attachment!=null) { //we have only the
attachment
Iterator i = sel.keys().iterator();
while(i.hasNext()) {
SelectionKey curKey = (SelectionKey)i.next();
- if
(curKey.attachment().equals(current.attachment)){
+ if(curKey == null) continue;
+ if(!curKey.isValid()) continue;
+ Object attachment = curKey.attachment();
+ if(attachment == null) {
+ if(logDebug)
+ Core.logger.log(this, "Key
"+curKey+" has null "+
+
"attachment unregistering "+
+
current, Logger.ERROR);
+ curKey.cancel();
+ } else if
(attachment.equals(current.attachment)) {
curKey.cancel();
break;
}
}
-
}
if (current.attachment!=null)
try{
@@ -347,7 +377,15 @@
}
try {
+ if(sel.isOpen()) {
+ try {
sel.close();
+ } catch (Throwable t) {
+ Core.logger.log(this, "Caught "+t+" closing
selector in reset()",
+ t,
Logger.ERROR);
+ }
+ // Try open() anyway
+ }
sel = Selector.open();
@@ -397,26 +435,63 @@
* channels are closed
* now this cancel()'s the keys and doesn't notify the close thread --zab
*/
- private void handleCloseQueuePipe()
- {
- synchronized(closeQueue) {
- synchronized(preCloseQueue) {
- //boolean wasEmpty = closeQueue.isEmpty();
- //boolean itemAdded = false;
+ private void handleCloseQueuePipe() {
+ //We aren't synchronized on preCloseQueue here but it doesn't mean the
end of the
+ //world that we queue the close a little while later instead (as of
+ //2003-10-16 this method is called by three selector threads).
+ //This extra check is done to avoid the
'synchronized(preCloseQueueObject)'
+ //scope below if it probably wouldn't gain us something to enter it
+ //(locking optimization)
+ if(preCloseQueue.isEmpty())
+ return;
+
+ //Remove all entries from the preCloseQueue into a local queue
+ //in order to prevent other threads from beeing blocked on
preCloseQueue
+ //during the processing below (locking optimization)
+ LinkedList workList = new LinkedList();
+ synchronized (preCloseQueueObject) {
while(!preCloseQueue.isEmpty()) {
- ClosePair current =
(ClosePair)preCloseQueue.removeFirst();
- if (current.sc != null &&
current.sc.keyFor(sel) != null)
- current.sc.keyFor(sel).cancel();
- closeQueue.addLast(current);
- // itemAdded = true;
+ workList.addLast(preCloseQueue.removeFirst());
+ }
}
- /*if(wasEmpty && itemAdded)
- closeQueue.notify();*/
+ //Cancel the keys that have been queued for closing on all ?known?
+ //selector threads
+ Iterator it = workList.iterator();
+ ClosePair current;
+ while(it.hasNext()){
+ current = (ClosePair)it.next();
+ if ( current.sc != null) {
+ SelectionKey k =
tcpConnection.getRSL().keyFor(current.sc);
+ if (k != null) {
+ k.cancel();
+ } else {
+ k = tcpConnection.getWSL().keyFor(current.sc);
+ if (k != null) {
+ k.cancel();
+ }
+ }
}
}
+
+ //An extra check here to avoid entering the 'synchronized(closeQueue)'
scope below
+ //if we dont actually need to (locking optimization)
+ if(workList.isEmpty())
+ return;
+
+ //Finished processing all the closerequests we popped from the
+ //preCloseQueue earlier in this method, now queue them for actual
closing
+ //From here the closeThread will pick them up at its leisure...
+ synchronized (closeQueue) {
+ closeQueue.addAll(workList);
+ }
}
+ /**
+ * Do a selection operation
+ * @return true if we selected, false if something failed, the usual
+ * response would be to try again
+ */
protected final boolean mySelect(int x) throws IOException {
boolean windowsBugHappened = false;
try{
@@ -519,6 +594,20 @@
}
return false;
} else throw e;
+ } catch (ClosedSelectorException e) {
+ Core.logger.log(this, "WTF?!: "+e, e,
+ Logger.ERROR);
+ try {
+ reset();
+ } catch (Throwable t) {
+ Core.logger.log(this, "Reopening selector FAILED!:
"+t, t,
+ Logger.ERROR);
+ System.err.println("Reopening selector FAILED! Tried
to handle: "+e);
+ e.printStackTrace(System.err);
+ System.err.println("But then caught: "+t);
+ t.printStackTrace(System.err);
+ }
+ return false;
}
if(!windowsBugHappened)
this.consecutiveWindowsBugEncounters =0;
@@ -539,11 +628,11 @@
int consecutiveDuds = 0;
while(!stop) {
fastReturn = false;
- logDebug = Core.logger.shouldLog(Logger.DEBUG);
+ logDebug = Core.logger.shouldLog(Logger.DEBUG, this);
try{ //a very wide and generic net
//cancel() all keys before beforeSelect()
- handleCloseQueuePipe();
+ handleCloseQueuePipe(); // move this down to rsl --zab
beforeSelect();
@@ -558,8 +647,12 @@
//sel.selectedKeys().clear();
//select on the selector
iter++;
+ long now = System.currentTimeMillis();
if(!mySelect(timeout)) continue;
-
+ long selected = System.currentTimeMillis();
+ if(logDebug) Core.logger.log(this, "Returned from selector in
"+
+ (selected-now)+" millis",
+ Logger.DEBUG);
// Core.logger.log(this, "Returned from selector,
"+currentlyActive+
@@ -570,16 +663,13 @@
//currentSet.addAll(sel.selectedKeys());
if(logDebug)
- Core.logger.log(this, "Keys ready before fixKeys: "+
- currentlyActive+"/"+
- sel.keys().size(),
Logger.DEBUG);
+ Core.logger.log(this, "Keys ready before fixKeys: " +
currentlyActive + "/" + sel.keys().size(), Logger.DEBUG);
fixKeys();
//if (currentlyActive != currentSet.size())
Core.logger.log(this, "read the freaking book! "+ currentlyActive +" != "+
currentSet.size() ,Logger.ERROR);
currentlyActive = currentSet.size();
if(logDebug)
- Core.logger.log(this, "Keys ready:
"+currentlyActive+"/"+
- sel.keys().size(),
Logger.DEBUG);
+ Core.logger.log(this, "Keys ready: " + currentlyActive
+ "/" + sel.keys().size(), Logger.DEBUG);
//if at this point no channels are active, it means
//we were just woken up or timed out.
@@ -624,14 +714,11 @@
System.runFinalization();
freenet.node.Main.dumpInterestingObjects();
try {
- Core.logger.log(this, "Ran emergency GC in
"+getClass().getName(),
- Logger.ERROR);
+ Core.logger.log(this, "Ran emergency GC in " +
getClass().getName(), Logger.ERROR);
} catch (Throwable any) {};
} catch(Throwable t){
try {
- Core.logger.log(this,
- "Caught throwable in
AbstractSelectorLoop!: "
- +t, t, Logger.ERROR);
+ Core.logger.log(this, "Caught throwable in
AbstractSelectorLoop!: " + t, t, Logger.ERROR);
t.printStackTrace();
} catch (Throwable x) {};
}
@@ -652,7 +739,7 @@
if(chan.attachment != null)
((NIOCallback)(chan.attachment)).queuedClose();
//if(isWindows)
- synchronized(preCloseQueue) {
+ synchronized(preCloseQueueObject) {
preCloseQueue.addLast(chan);
}
/* else
@@ -664,7 +751,7 @@
}*/
}
- public void queueClose(Connection conn, SocketChannel sc) {
+ public final void queueClose(Connection conn, SocketChannel sc) {
Socket s = null;
try {
s = ((tcpConnection)conn).getSocket();
@@ -673,7 +760,7 @@
queueClose(new ClosePair(conn, null, sc));
}
- public void queueClose(Connection conn, NIOCallback cb, SocketChannel sc) {
+ public final void queueClose(Connection conn, NIOCallback cb, SocketChannel
sc) {
Socket s = null;
try {
s = ((tcpConnection)conn).getSocket();
@@ -682,33 +769,26 @@
queueClose(new ClosePair(conn, cb, sc));
}
- public void queueClose(SocketChannel chan) {
- if(!chan.isConnected()) return;
- if(!chan.isOpen()) return;
- Connection c = tcpConnection.getConnectionForSocket(chan.socket());
- if(c == null) {
- if(!chan.isConnected()) return;
- if(!chan.isOpen()) return;
- throw new IllegalArgumentException("Fed socket not connected
to a tcpConnection!: "+chan);
- }
- if (closeUniqueness.containsKey(c)) return;
- queueClose(new ClosePair(c, null, chan));
+ public final void queueClose(SocketChannel chan) {
+ queueClose(chan, null);
}
- public void queueClose(SocketChannel chan, NIOCallback nc) {
- if(!chan.isConnected()) return;
- if(!chan.isOpen()) return;
- Connection c = tcpConnection.getConnectionForSocket(chan.socket());
+ public final void queueClose(SocketChannel chan, NIOCallback nc) {
+ if(chan == null) return;
+ Socket sock = chan.socket();
+ if(sock == null) return;
+ Connection c = tcpConnection.getConnectionForSocket(sock);
if(c == null) {
- if(!chan.isConnected()) return;
- if(!chan.isOpen()) return;
- throw new IllegalArgumentException("Fed socket not connected
to a tcpConnection!: "+chan+","+nc);
+ if(chan.isConnected() && chan.isOpen())
+ throw new IllegalArgumentException("Fed socket not
connected "+
+
"to a tcpConnection!: "+chan);
+ else return;
}
if (closeUniqueness.containsKey(c)) return;
queueClose(new ClosePair(c,nc,chan));
}
- public void queueClose(ChannelAttachmentPair pair) {
+ public final void queueClose(ChannelAttachmentPair pair) {
NIOCallback cb = null;
if(pair.attachment instanceof NIOCallback)
cb = (NIOCallback)(pair.attachment);
@@ -729,11 +809,12 @@
synchronized(closeQueue) {
while(closeQueue.isEmpty()) {
try {
- closeQueue.wait(500);
+ //closeQueue.wait(500);
+ closeQueue.wait();
//Wait until someone tells us to close.. dont start closing when we aren't told to
} catch (InterruptedException
e) {};
if(terminateCloseThread)
return;
}
- current =
(ClosePair)(closeQueue.removeFirst());
+ current =
(ClosePair)closeQueue.removeFirst();
}
if(current != null) {
Connection c = current.conn;
@@ -741,8 +822,9 @@
try {
c.close(true);
} catch (Throwable t) {
- Core.logger.log(this,
"Caught "+t+" closing "+
-
c, t, Logger.ERROR);
+ Core.logger.log(this,
"Caught " + t + " closing " + c, t, Logger.ERROR);
+ } finally {
+
Core.diagnostics.occurrenceContinuous("closePairLifetime", System.currentTimeMillis()
- current.timeStamp);
}
//notify the callback
if (current.attachment !=null){
@@ -767,8 +849,7 @@
} catch (Throwable any) {};
} catch (Throwable t) {
try {
- String s = "Caught Throwable "+t+
- " in background close thread!";
+ String s = "Caught Throwable " + t + "
in background close thread!";
Core.logger.log(this, s, t,
Logger.ERROR);
// FIXME
// System.err, System.out may be files
on disk
Index: ListenSelectorLoop.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/transport/ListenSelectorLoop.java,v
retrieving revision 1.4.2.7
retrieving revision 1.4.2.8
diff -u -w -r1.4.2.7 -r1.4.2.8
--- ListenSelectorLoop.java 16 Aug 2003 17:30:07 -0000 1.4.2.7
+++ ListenSelectorLoop.java 28 Oct 2003 20:20:48 -0000 1.4.2.8
@@ -91,11 +91,11 @@
boolean success = true;
oomSleep = 1000;
try {
- Iterator i = currentSet.iterator();
+ Iterator i = sel.keys().iterator();
while (i.hasNext()) {
try {
SelectionKey curKey = (SelectionKey)i.next();
- i.remove(); //yes I think its a good idea
+ //i.remove(); //yes I think its a good idea //
reenable iff switch back to selectedKeys
ServerSocketChannel sc = (ServerSocketChannel)curKey.channel();
SocketChannel chan = null;
NIOListener listener = (NIOListener)curKey.attachment();
@@ -116,7 +116,8 @@
System.runFinalization();
oomSleep *= 2;
try {
- String s = "Attempted to recover from OutOfMemoryError";
+ String s = "Attempted to recover from OutOfMemoryError "+
+ e.toString();
freenet.Core.logger.log(this, s,
freenet.support.Logger.ERROR);
System.err.println(s);
Index: NIOReader.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/transport/NIOReader.java,v
retrieving revision 1.2.2.1
retrieving revision 1.2.2.2
diff -u -w -r1.2.2.1 -r1.2.2.2
--- NIOReader.java 1 Jul 2003 02:27:18 -0000 1.2.2.1
+++ NIOReader.java 28 Oct 2003 20:20:48 -0000 1.2.2.2
@@ -1,7 +1,6 @@
/* -*- Mode: java; c-basic-indent: 4; tab-width: 4 -*- */
package freenet.transport;
-
import java.nio.*;
public interface NIOReader extends NIOCallback {
/**
Index: ReadSelectorLoop.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/transport/ReadSelectorLoop.java,v
retrieving revision 1.17.2.8
retrieving revision 1.17.2.9
diff -u -w -r1.17.2.8 -r1.17.2.9
--- ReadSelectorLoop.java 2 Aug 2003 02:36:19 -0000 1.17.2.8
+++ ReadSelectorLoop.java 28 Oct 2003 20:20:48 -0000 1.17.2.9
@@ -30,6 +30,15 @@
private final LinkedList maintenanceQueue;
+ private class MaintenancePair {
+ NIOReader attachment;
+ SocketChannel chan;
+ MaintenancePair(NIOReader reader, SocketChannel sc) {
+ this.attachment=reader;
+ this.chan=sc;
+ }
+ }
+
//this needs to be taken from the settings.. wide guess is 2K
//public static final int BUFFER_SIZE=2048;
@@ -42,9 +51,10 @@
will be used. We can't however ignore those users on 100MBit connections
;-)) */
protected static final int MAX_CONC_CHANNELS=20;
- public ReadSelectorLoop(Bandwidth bw) throws IOException{
+ public ReadSelectorLoop(Bandwidth bw, int timerGranularity)
+ throws IOException{
- super(bw);
+ super(bw, timerGranularity);
//create the buffer stuff
bufferMap = new HashMap(MAX_CONC_CHANNELS);
// buffers = new ByteBuffer[MAX_CONC_CHANNELS];
@@ -104,25 +114,89 @@
}
//make sure no message got stuck behind a trailing field
+ if(logDebug)
+ Core.logger.log(this, "beforeSelect(), mq.size="+
+ maintenanceQueue.size(), Logger.DEBUG);
+
while (maintenanceQueue.size() > 0) {
- NIOReader current;
- //process could take a while, so lock just this
+ MaintenancePair mp;
synchronized(maintenanceQueue) {
- current = (NIOReader)maintenanceQueue.removeFirst();
+ mp = (MaintenancePair)maintenanceQueue.removeFirst();
+ }
+ SocketChannel chan = mp.chan;
+ NIOReader current = mp.attachment;
+ SelectionKey k = chan.keyFor(sel);
+ //process could take a while, so lock just this
+ try {
+ int status = current.process(null);
+ if(logDebug)
+ Core.logger.log(this, "Running maintenance on "+chan+
+ ":"+current+", returned "+status,
+ Logger.DEBUG);
+ if(status == -1) {
+ if(logDebug)
+ Core.logger.log(this, "Closing connection "+chan+":"+
+ current+" (process returned -1)",
+ Logger.DEBUG);
+ if(k != null) {
+ k.attach(null);
+ k.cancel();
+ current.unregistered();
+ } else {
+ if(logDebug)
+ Core.logger.log(this, "Maintenance process returned -1
but not registered on selector, queuing for unregistration", Logger.DEBUG);
+ unregisterWaiters.add(new ChannelAttachmentPair
+ (chan, current));
+ }
+ queueClose((SocketChannel)chan,current);
+ } else if (status == 0) {
+ if (logDebug) Core.logger.log(this, "Cancelling "+chan+
+ ":"+current+"(returned 0)",
+ Logger.DEBUG);
+ if(k != null) {
+ k.cancel();
+ } else {
+ if(logDebug)
+ Core.logger.log(this, "Maintenance process returned 0 but
not registered on selector, queuing for unregistration", Logger.DEBUG);
+ unregisterWaiters.add(new ChannelAttachmentPair
+ (chan, current));
+ }
+ synchronized(dontReregister) {
+ dontReregister.add(chan);
+ }
+ if (logDebug)Core.logger.log(this, "Cancelled "+chan+": "+
+ (k==null?"(null)":
+ Boolean.toString(k.isValid()))
+ +":"+k, Logger.DEBUG);
+ }
+ } catch (OutOfMemoryError e) {
+ System.gc();
+ System.runFinalization();
+ System.gc();
+ System.runFinalization();
+ freenet.node.Main.dumpInterestingObjects();
+ try {
+ Core.logger.log(this, "Ran emergency GC in "+
+ getClass().getName(), Logger.ERROR);
+ } catch (Throwable any) {};
+ } catch (Throwable t) {
+ System.err.println("Caught "+t+" running maintenance queue");
+ t.printStackTrace(System.err);
+ Core.logger.log(this, "Caught "+t+" running maintenance queue",
+ t, Logger.ERROR);
}
- current.process(null);
}
-
throttleBeforeSelect();
-
}
- public final void scheduleMaintenance(NIOReader cb) {
+ public final void scheduleMaintenance(SocketChannel cb,
+ NIOReader attachment) {
if(logDebug)
- Core.logger.log(this, "Scheduling maintenance on "+cb,
- new Exception("debug"),
Logger.DEBUG);
+ Core.logger.log(this, "Scheduling maintenance on "+cb+
+ ":"+attachment, new Exception("debug"),
+ Logger.DEBUG);
synchronized (maintenanceQueue) {
- maintenanceQueue.addLast(cb);
+ maintenanceQueue.addLast(new MaintenancePair(attachment, cb));
}
}
@@ -269,9 +343,14 @@
}
}
} catch (Throwable e) {
- e.printStackTrace();
+ try {
Core.logger.log(this, "Unexpected throwable reading
data for "+
nc+": "+e, e,
Logger.NORMAL);
+ e.printStackTrace();
+ } catch (Throwable t) {
+ Core.logger.log(this, "Unexpected throwable reading data: "+
+ e, e, Logger.NORMAL);
+ }
size = -1; // try unregistering it...
}
if (size == -1) {
@@ -291,7 +370,7 @@
nc.unregistered();
noneWorking = false;
} else if (size > 0 || (bumper.limit() > 0)) {
- //do not call process() if nothign was read
+ //do not call process() if nothing was read
//this actually is an error in fixkeys
if(size > 0) {
bytesRead += (size+OVERHEAD);
@@ -373,6 +452,9 @@
nc, t,
Logger.NORMAL);
status = -1;
}
+ if(logDebug)
+ Core.logger.log(this, ""+chan+":"+nc+" returned "+
+ status, Logger.DEBUG);
if(status == -1) {
if(logDebug)
Core.logger.log(this, "Closing
connection "+chan+":"+nc+
Index: ThrottledSelectorLoop.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/transport/ThrottledSelectorLoop.java,v
retrieving revision 1.10.2.7
retrieving revision 1.10.2.8
diff -u -w -r1.10.2.7 -r1.10.2.8
--- ThrottledSelectorLoop.java 5 Aug 2003 19:41:29 -0000 1.10.2.7
+++ ThrottledSelectorLoop.java 28 Oct 2003 20:20:48 -0000 1.10.2.8
@@ -32,6 +32,8 @@
int throttleQueueLength = 0;
+ protected boolean shortTimeout = false;
+
public final int throttleQueueLength() {
return throttleQueueLength;
}
@@ -56,11 +58,14 @@
// sync on this to prevent SelectorLoop thread from un-throttling
protected Bandwidth bw;
+ protected int timerGranularity;
- public ThrottledSelectorLoop(Bandwidth bw) throws IOException {
+ public ThrottledSelectorLoop(Bandwidth bw, int timerGranularity)
+ throws IOException {
this.bw = bw;
throttleDisabledQueue = new LinkedList();
+ this.timerGranularity = timerGranularity;
// rand = freenet.Core.randSource;
}
@@ -76,16 +81,16 @@
}
public final void throttleBeforeSelect() {
- boolean logDEBUG = Core.logger.shouldLog(Logger.DEBUG);
+ logDebug = Core.logger.shouldLog(Logger.DEBUG, this);
if (throttling) {
long now = System.currentTimeMillis();
- if(logDEBUG)
+ if(logDebug)
Core.logger.log(this, "Still throttling at "+now,
Logger.DEBUG);
if(now >= reregisterThrottledTime) {
synchronized(throttleLock) {
if(bytesRemainingOnThrottle > 0) {
- if(logDEBUG)
+ if(logDebug)
Core.logger.log(this, "Calling
throttle with remaining "+
bytesRemainingOnThrottle+" bytes",
Logger.DEBUG);
@@ -94,7 +99,7 @@
if(tok.sleepUntil > 0) {
reregisterThrottledTime =
tok.sleepUntil;
bytesRemainingOnThrottle -=
tok.availableNow;
- if(logDEBUG)
+ if(logDebug)
Core.logger.log(this,
"Still not ready - sleeping for "+
(reregisterThrottledTime - now)+" ms",
Logger.DEBUG);
@@ -110,10 +115,11 @@
reregisterThrottledTime > now) {
timeout = (int)(reregisterThrottledTime - now);
}
- }
+ } else timeout = shortTimeout ? 0 : TIMEOUT;
} else
- timeout = TIMEOUT;
- if(logDEBUG)
+ timeout = shortTimeout ? 0 : TIMEOUT;
+ shortTimeout = false;
+ if(logDebug)
Core.logger.log(this, "Set timeout to "+timeout,
Logger.DEBUG);
}
@@ -158,7 +164,7 @@
// Already thinks it is registered
}
} catch (CancelledKeyException e) {
- if(Core.logger.shouldLog(Logger.DEBUG))
+ if(Core.logger.shouldLog(Logger.DEBUG,this))
Core.logger.log(this, "Key ("+current+
") cancelled
but not removed: "+e, e,
Logger.ERROR);
@@ -168,7 +174,7 @@
queueClose(current);
closed++;
} catch (ClosedChannelException e) {
- if(Core.logger.shouldLog(Logger.DEBUG))
+ if(Core.logger.shouldLog(Logger.DEBUG,this))
Core.logger.log(this, "Key ("+current+
") channel
closed but not removed: "+e,
e,
Logger.ERROR);
@@ -200,9 +206,9 @@
protected final void throttleConnections(int bytesRead, int throttledBytesRead,
int pseudoThrottledBytesRead) {
if(bw == null) return;
- boolean logDEBUG = Core.logger.shouldLog(Logger.DEBUG);
+ logDebug = Core.logger.shouldLog(Logger.DEBUG, this);
if(bytesRead > 0) {
- if(logDEBUG)
+ if(logDebug)
Core.logger.log(this, "Bytes moved total this loop: "+
bytesRead+", bytes
that need throttling: "+
throttledBytesRead+",
pseudo-throttled: "+
@@ -252,7 +258,7 @@
SelectionKey curKey =
(SelectionKey)(it.next());
if(!curKey.isValid()) {
- if(logDEBUG) Core.logger.log(this,
"Invalid "+curKey+"("+curKey.channel()+","+curKey.attachment()+" on selector in
throttleConnections, ignoring", Logger.DEBUG);
+ if(logDebug) Core.logger.log(this,
"Invalid "+curKey+"("+curKey.channel()+","+curKey.attachment()+" on selector in
throttleConnections, ignoring", Logger.DEBUG);
onInvalidKey(curKey);
continue;
}
@@ -279,11 +285,11 @@
throttleQueueLength =
throttleDisabledQueue.size();
deregistered++;
}
- if(logDEBUG) Core.logger.log(this,
"Deregistered "+
+ if(logDebug) Core.logger.log(this,
"Deregistered "+
curKey.attachment(), Logger.DEBUG);
}
}
- if(logDEBUG)
+ if(logDebug)
Core.logger.log(this, "Deregistered
"+deregistered+
" keys,
TDQ.size="+
throttleDisabledQueue.size(),
@@ -329,6 +335,13 @@
dontReregister.remove(ch);
}
super.register(ch, attachment);
+ }
+
+ protected void queueClose(ClosePair chan) {
+ synchronized(dontReregister) {
+ dontReregister.remove(chan.sc);
+ }
+ super.queueClose(chan);
}
/**
Index: WriteSelectorLoop.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/transport/WriteSelectorLoop.java,v
retrieving revision 1.16.2.12
retrieving revision 1.16.2.13
diff -u -w -r1.16.2.12 -r1.16.2.13
--- WriteSelectorLoop.java 15 Aug 2003 03:05:56 -0000 1.16.2.12
+++ WriteSelectorLoop.java 28 Oct 2003 20:20:48 -0000 1.16.2.13
@@ -51,12 +51,24 @@
private static final int TABLE_SIZE=512;
private static final float TABLE_FACTOR=(float)0.6;
+ int minSendBytesPerThrottleCycle;
+ private static final int GRANULES_PER_THROTTLE_CYCLE = 20;
+
/**
* nothing special about this constructor
*/
- public WriteSelectorLoop(Bandwidth bw) throws IOException {
+ public WriteSelectorLoop(Bandwidth bw, int timerGranularity)
+ throws IOException {
- super(bw);
+ super(bw, timerGranularity);
+ minSendBytesPerThrottleCycle =
+ (bw.currentBandwidthPerSecondAllowed() *
+ GRANULES_PER_THROTTLE_CYCLE *
+ timerGranularity) / 1000;
+ Core.logger.log(this, "Minimum send size: "+
+ minSendBytesPerThrottleCycle+" per "+
+ (GRANULES_PER_THROTTLE_CYCLE *
timerGranularity)+
+ "ms", Logger.MINOR);
jobs = new BlockingQueue();
uniqueness = new Hashtable(TABLE_SIZE,TABLE_FACTOR);
sorter=new QuickSorter();
@@ -68,6 +80,7 @@
public WriteSelectorLoop() throws IOException {
super();
+ minSendBytesPerThrottleCycle = Integer.MAX_VALUE;
jobs = new BlockingQueue();
uniqueness = new Hashtable(TABLE_SIZE,TABLE_FACTOR);
sorter=new QuickSorter();
@@ -76,7 +89,7 @@
protected Object idSync = new Object();
protected long idCount = 0;
public final static int maxDelay = 2000;
- public static final int NEGOTIATION = -1;
+ public static final int NEGOTIATION = -10;
public static final int MESSAGE = 0;
public static final int TRAILER = 1;
/**
@@ -174,7 +187,7 @@
public final String toString() {
return SendJob.this.getClass().getName()+":
"+data+","+destination+
- ","+client+","+position+","+id;
+ ","+client+","+position+","+id+",prio="+priority;
}
public final void closed() {
@@ -235,7 +248,8 @@
if (data.length==0 || destination ==null) throw new IOException ("no
data to send?");
- if (!checkValid(destination))return false;
+ if (!checkValid(destination, client))
+ return false;
SendJob job = new SendJob(data,destination,client, priority);
@@ -255,7 +269,8 @@
if (len==0 || destination==null) throw new IOException ("no data to
send?");
- if (!checkValid(destination))return false;
+ if (!checkValid(destination, client))
+ return false;
SendJob job = new SendJob(data,offset,len,destination,client,priority);
@@ -270,13 +285,14 @@
return true;
}
- private final boolean checkValid(SelectableChannel destination) {
+ private final boolean checkValid(SelectableChannel destination,
+ NIOWriter
client) {
//if we are already registered, return false
SelectionKey key = destination.keyFor(sel);
if (key != null && key.isValid()) {
- if(logDebug)
- Core.logger.log(this, "Send failed due to valid key",
- Logger.DEBUG);
+ Core.logger.log(this, "Send failed due to valid key for "+
+ destination+":"+client,
+ new Exception("debug"),
Logger.ERROR);
return false;
}
@@ -325,8 +341,7 @@
//this queue is local to the thread.
LinkedList waitingJobs = new LinkedList();
- boolean logDEBUG = logDebug;
- if(logDEBUG)
+ if(logDebug)
Core.logger.log(this, "beforeSelect()", Logger.DEBUG);
// first check if any channel is registered in the
// selector.
@@ -348,13 +363,13 @@
if (sel.keys().isEmpty() && jobs.isEmpty()) {
//check if the jobs queue is empty,
//and if it is, block on it.
- if(logDEBUG)
+ if(logDebug)
Core.logger.log(this, "Waiting for
job to add to "+
"queue", Logger.DEBUG);
long delay = 0;
synchronized(throttleLock) {
if(throttling) {
- if(logDEBUG)
+ if(logDebug)
Core.logger.log(this,
"Throttling at "+now+
" until "+
reregisterThrottledTime,
@@ -369,7 +384,7 @@
}
int x = (delay > Integer.MAX_VALUE) ?
Integer.MAX_VALUE
: ((int)delay);
- if(logDEBUG)
+ if(logDebug)
Core.logger.log(this, "Delay will be
"+x+" ms: ",
Logger.DEBUG);
Object o = jobs.dequeue(x);
@@ -381,7 +396,7 @@
ChannelAttachmentPair
pair =
new
ChannelAttachmentPair(current.destination, current);
throttleDisabledQueue.addLast(pair);
- if(logDEBUG)
+ if(logDebug)
Core.logger.log(this, "Moving new job "+
pair+" onto delay queue",
Logger.DEBUG);
@@ -389,7 +404,7 @@
}
}
waitingJobs.add(o);
- if(logDEBUG)
+ if(logDebug)
Core.logger.log(this,
"Dequeued job",
Logger.DEBUG);
continue;
@@ -408,20 +423,20 @@
ChannelAttachmentPair
pair =
new
ChannelAttachmentPair(current.destination, current);
throttleDisabledQueue.addLast(pair);
- if(logDEBUG)
+ if(logDebug)
Core.logger.log(this, "Moving new job "+
pair+" onto delay queue",
Logger.DEBUG);
continue;
}
}
- if(logDEBUG)
+ if(logDebug)
Core.logger.log(this,
"Copying job "+current+
" to queue", Logger.DEBUG);
waitingJobs.add(current);
}
long endTime = System.currentTimeMillis();
- if(logDEBUG)
+ if(logDebug)
Core.logger.log(this, "Took
"+(endTime-startTime)+
"
millis copying queue ("+iter+")",
Logger.DEBUG);
@@ -439,15 +454,15 @@
while (i.hasNext()) {
SendJob currentJob = (SendJob) i.next();
try {
- if(logDEBUG)
+ if(logDebug)
Core.logger.log(this, "Registering channel
"+currentJob+
" with
selector", Logger.DEBUG);
currentJob.destination.register(sel,
SelectionKey.OP_WRITE, currentJob);
- if(logDEBUG)
+ if(logDebug)
Core.logger.log(this, "Registered channel
"+currentJob+
" with
selector", Logger.DEBUG);
}catch (ClosedChannelException e) {
- if(logDEBUG)
+ if(logDebug)
Core.logger.log(this, "Channel closed:
"+currentJob+": "+e,
e,
Logger.DEBUG);
queueClose(((SocketChannel)currentJob.destination),
@@ -472,10 +487,6 @@
}
}
- //check if any of the channels got closed;
- //REDFLAG: the behavior of select() on channels that have
- //been closed remotely is not tested.
- //TEST PROPERLY AND IMPLEMENT CHECKS HERE!!!
protected final boolean inspectChannels() {
if(logDebug)
Core.logger.log(this, "inspectChannels()", Logger.DEBUG);
@@ -489,8 +500,7 @@
//at this stage the selected set should contain only channels
//that are ready to be written to and have something to be sent.
protected final boolean processConnections() {
- boolean logDEBUG = logDebug;
- if(logDEBUG)
+ if(logDebug)
Core.logger.log(this, "processConnections()", Logger.DEBUG);
boolean success = true;
int throttledBytes = 0;
@@ -512,7 +522,6 @@
try{
boolean noThrottled = (System.currentTimeMillis() <
reregisterThrottledTime);
- boolean noMoreThrottled = false;
// Some of them may have been enableThrottle()d off thread
while (i.hasNext()) {
boolean localSuccess = true;
@@ -538,12 +547,13 @@
if(currentJob.destination instanceof SocketChannel &&
(!((SocketChannel)(currentJob.destination)).
isConnected())) throw new IOException("not
connected");
- if(currentJob.client.shouldThrottle()) {
- if(noMoreThrottled) {
+ if(bw != null && currentJob.client.shouldThrottle()) {
+ if(throttledBytes >
minSendBytesPerThrottleCycle) {
// Will get cancelled by
throttleConnections
- if(logDEBUG)
+ if(logDebug)
Core.logger.log(this,
"Skipping (A) throttled "+
currentJob, Logger.DEBUG);
+ shortTimeout = true; // just in case
the bytes are absorbed by the throttle
continue;
} else if(noThrottled) {
// May not get cancelled by
throttleConnections
@@ -571,13 +581,13 @@
if(currentJob.data.remaining() > lim) {
currentJob.data.limit(currentJob.data.position() +
lim);
- if(logDEBUG)
+ if(logDebug)
Core.logger.log(this,
"Limited: "+currentJob.data+
" for "+currentJob.client+
", limit was "+oldLimit, Logger.DEBUG);
}
} else {
- if(logDEBUG)
+ if(logDebug)
Core.logger.log(this, "Did not limit,
"+
currentJob.data.remaining()+"/"+
(bw==null?"(null)": Integer.
@@ -607,7 +617,7 @@
if(currentJob.client.shouldThrottle()) {
throttledBytes += (sent+OVERHEAD);
totalWrittenThrottlableBytes +=
(sent+OVERHEAD);
- if(logDEBUG) {
+ if(logDebug) {
Core.logger.log(this, "Should
throttle "+
currentJob, Logger.DEBUG);
Core.logger.log(this, "Written
"+sent+
@@ -632,11 +642,11 @@
if(currentJob.client.countAsThrottled()) {
pseudoThrottledBytes +=
(sent+OVERHEAD);
totalWrittenPseudoThrottlableBytes += (sent+OVERHEAD);
- if(logDEBUG)
+ if(logDebug)
Core.logger.log(this,
"Pseudo-throttle "+
currentJob, Logger.DEBUG);
} else {
- if(logDEBUG)
+ if(logDebug)
Core.logger.log(this,
"Should not throttle "+
currentJob, Logger.DEBUG);
}
@@ -667,8 +677,16 @@
curKey+") from
uniqueness, now "+
uniqueness.size(), Logger.DEBUG);
try {
-
currentJob.client.jobDone(currentJob.data.position()-currentJob.position,
-
localSuccess);
+ if(currentJob.data == null ||
+ currentJob.client == null)
+ Core.logger.log(this,
"currentJob.data="+currentJob.data+
+
", currentJob.client="+currentJob.client,
+
Logger.ERROR);
+ else {
+ int delta =
currentJob.data.position() -
+ currentJob.position;
+
currentJob.client.jobDone(delta, localSuccess);
+ }
} catch (Throwable t) {
Core.logger.log(this, "Caught "+t+"
notifying "+
currentJob.client+" for "+
@@ -685,17 +703,11 @@
currentJob.client.jobPartDone(currentJob.data.position()-currentJob.position);
}
}
- if(currentJob.client.shouldThrottle() && sent > 0) {
- // Throttle this one first
- if (logDebug)Core.logger.log(this, "Set
noMoreThrottled because of "+
- currentJob,
Logger.DEBUG);
- noMoreThrottled = true;
}
- }
- if(logDEBUG)
+ if(logDebug)
Core.logger.log(this, "Written "+bytesSent, Logger.DEBUG);
if(throttledBytes != 0 || pseudoThrottledBytes != 0) {
- if(logDEBUG)
+ if(logDebug)
Core.logger.log(this, "Written "+throttledBytes+
" bytes that should be
throttled",
Logger.DEBUG);
@@ -753,6 +765,7 @@
c = (SocketChannel[])(uniqueness.keySet().toArray(c));
for(int x=0;x<c.length;x++) {
SocketChannel chan = c[x];
+ if(chan == null) continue; // race condition
if(!chan.isOpen()) {
out.append("NOT OPEN: ").append(chan);
Object o = uniqueness.get(chan);
Index: tcpAddress.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/transport/tcpAddress.java,v
retrieving revision 1.4.4.5.2.5
retrieving revision 1.4.4.5.2.6
diff -u -w -r1.4.4.5.2.5 -r1.4.4.5.2.6
--- tcpAddress.java 29 Jul 2003 21:08:10 -0000 1.4.4.5.2.5
+++ tcpAddress.java 28 Oct 2003 20:20:48 -0000 1.4.4.5.2.6
@@ -23,12 +23,17 @@
private InetAddress host;
private String hostName = null;
private int port;
+ private int hashCode; // has to be invariant with lookups etc
public static boolean throttleAll = false;
private String valname = "";
private final tcpTransport transport;
+ public int hashCode() {
+ return hashCode;
+ }
+
public boolean equals(tcpAddress tcp) {
if(tcp == null) return false;
if(tcp.port != port) return false;
@@ -58,11 +63,15 @@
this(transport);
setPort(port);
this.host = host;
- if(host!=null)
+ if(host!=null) {
valname = host.getHostAddress();
- else
+
+ hashCode = port ^ valname.hashCode();//host.hashCode() ^
valname.hashCode();
+ } else {
+ hashCode = port;
valname = "";
}
+ }
/**
* Creates an address from a host name or IP string and port number.
@@ -75,6 +84,7 @@
host = null;
//doDeferredLookup();
valname = hostname;
+ hashCode = port ^ valname.hashCode();
}
/**
@@ -96,6 +106,7 @@
throw new BadAddressException(""+e);
}
setPort(Integer.parseInt(str.substring(colon + 1)));
+ hashCode = port ^ valname.hashCode();
}
public final void doDeferredLookup() throws UnknownHostException {
Index: tcpConnection.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/transport/tcpConnection.java,v
retrieving revision 1.8.4.6.2.10
retrieving revision 1.8.4.6.2.11
diff -u -w -r1.8.4.6.2.10 -r1.8.4.6.2.11
--- tcpConnection.java 16 Aug 2003 17:30:08 -0000 1.8.4.6.2.10
+++ tcpConnection.java 28 Oct 2003 20:20:48 -0000 1.8.4.6.2.11
@@ -1,12 +1,15 @@
package freenet.transport;
import freenet.*;
+import freenet.node.Node; // for listenPort
+import freenet.node.Main; // for Main.timerGranularity... fixme?
import freenet.support.io.*;
import freenet.support.Logger;
import java.io.*;
import java.net.Socket;
import java.net.InetAddress;
import java.util.HashMap;
+import java.util.LinkedList;
import java.nio.*;
public final class tcpConnection extends Connection {
@@ -23,6 +26,7 @@
private boolean shouldThrottle = false;
private boolean shouldThrottleNow = false;
private boolean instanceCounted = false; // if the constructor throws, WE STILL
GET FINALIZED!
+ private static boolean logDEBUG;
// We do not throttle in FnpLink because it slows down negotiations drastically
// We do not directly throttle, ever, because of nio.
// Hence throttling is implemented in *SelectorLoop, not here
@@ -40,11 +44,21 @@
static WriteSelectorLoop wsl;
private static Bandwidth ibw = null;
private static Bandwidth obw = null;
+ private static boolean poolBuffers = true;
+ private static LinkedList bufferPool = new LinkedList();
+ private static boolean useDirectBuffers = false;
+ private static int BUFFER_SIZE = 16384;
private static final int streamBufferSize() {
return freenet.Core.streamBufferSize;
}
+ public static int bufferPoolSize() {
+ synchronized(bufferPool) {
+ return bufferPool.size();
+ }
+ }
+
public static final HashMap socketConnectionMap = new HashMap();
static synchronized public void setInputBandwidth(Bandwidth bw) {
@@ -63,17 +77,24 @@
// Start NIO loops
try {
if(rsl == null) {
- rsl = new ReadSelectorLoop(ibw);
+ if (ibw != null)
+ rsl = new ReadSelectorLoop(ibw, Main.timerGranularity);
+ else
+ rsl = new ReadSelectorLoop();
Thread rslThread = new Thread(rsl, " read interface thread");
rslThread.setDaemon(true);
- rslThread.setPriority(Thread.MAX_PRIORITY);
+ //rslThread.setPriority(Thread.MAX_PRIORITY);
rslThread.start(); // inactive until given registrations
}
if(wsl == null) {
- wsl = new WriteSelectorLoop(obw);
+ if (obw != null)
+ wsl = new WriteSelectorLoop(obw, Main.timerGranularity);
+ else
+ wsl= new WriteSelectorLoop();
+
Thread wslThread = new Thread(wsl, " write interface thread");
wslThread.setDaemon(true);
- wslThread.setPriority(Thread.MAX_PRIORITY);
+ //wslThread.setPriority(Thread.MAX_PRIORITY);
wslThread.start(); // inactive until given jobs
}
} catch (Throwable t) {
@@ -96,7 +117,7 @@
public boolean countAsThrottled() { return shouldThrottle; };
public void enableThrottle() {
- if(Core.logger.shouldLog(Logger.DEBUG))
+ if(Core.logger.shouldLog(Logger.DEBUG,this))
Core.logger.log(this, "Enabling throttle for "+this,
new Exception("debug"), Logger.DEBUG);
shouldThrottleNow = shouldThrottle;
@@ -142,6 +163,35 @@
public static boolean logBytes = false;
+ private final ByteBuffer getAccumulator() {
+ if(poolBuffers) {
+ synchronized(bufferPool) {
+ if(!bufferPool.isEmpty()) {
+ accumulator = (ByteBuffer)(bufferPool.removeFirst());
+ if(logDEBUG)
+ Core.logger.log(this, "Reused buffer from pool: "+
+ accumulator, Logger.DEBUG);
+
+ } else {
+ accumulator = useDirectBuffers ?
+ ByteBuffer.allocateDirect(BUFFER_SIZE) :
+ ByteBuffer.allocate(BUFFER_SIZE);
+ if(logDEBUG)
+ Core.logger.log(this, "Allocated new buffer because pool
empty: "+
+ accumulator, Logger.DEBUG);
+ }
+ }
+ } else {
+ accumulator = useDirectBuffers ?
+ ByteBuffer.allocateDirect(BUFFER_SIZE) :
+ ByteBuffer.allocate(BUFFER_SIZE);
+ if(logDEBUG)
+ Core.logger.log(this, "Allocated new buffer because not pooling: "+
+ accumulator, Logger.DEBUG);
+ }
+ return accumulator;
+ }
+
/**
* Used to create an outbound connection.
*/
@@ -150,7 +200,7 @@
throws ConnectFailedException {
this(t);
- boolean logDEBUG = Core.logger.shouldLog(Logger.DEBUG);
+ logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
if(logDEBUG)
Core.logger.log(this, "tcpConnection (outbound)",
new Exception("debug"), Logger.DEBUG);
@@ -173,7 +223,7 @@
/** NIO related stuff***/
sock.getChannel().configureBlocking(false);
- accumulator = ByteBuffer.allocate(16*1024); //FIXME:hardcoded
+ accumulator = getAccumulator();
accumulator.limit(0).position(0);
nioout = new NIOOutputStream(sock.getChannel(),this);
nioin = new NIOInputStream(accumulator,sock.getChannel(),this);
@@ -274,7 +324,7 @@
protected final void logInstances(String s) {
synchronized(profLock) {
- if(Core.logger.shouldLog(Logger.DEBUG))
+ if(Core.logger.shouldLog(Logger.DEBUG,this))
Core.logger.log(this, s+" ("+this+") instances: "+instances+
", openInstances: "+openInstances+
", created: "+createdInstances+
@@ -292,7 +342,7 @@
tcpConnection(tcpTransport t, Socket sock, int designator,
boolean dontThrottle, boolean doThrottle) throws IOException {
this(t);
- boolean logDEBUG = Core.logger.shouldLog(Logger.DEBUG);
+ logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
if(logDEBUG)
Core.logger.log(this, "tcpConnection (inbound)", Logger.DEBUG);
@@ -301,7 +351,7 @@
/** NIO related stuff***/
sock.getChannel().configureBlocking(false);
- accumulator = ByteBuffer.allocate(16*1024); //FIXME:hardcoded
+ accumulator = getAccumulator();
accumulator.limit(0).position(0);
nioout = new NIOOutputStream(sock.getChannel(),this);
nioin = new NIOInputStream(accumulator,sock.getChannel(),this);
@@ -392,7 +442,8 @@
* i.e. position = 0, limit = end of bytes available ("flipped")
*/
public ByteBuffer getInputBuffer() {
- return accumulator;
+ if(closed) return null;
+ else return accumulator;
}
Exception closeException;
@@ -403,7 +454,7 @@
public final void close(boolean fromCloseThread) {
closeException = new Exception("debug");
- boolean logDEBUG =Core.logger.shouldLog(Logger.DEBUG);
+ logDEBUG =Core.logger.shouldLog(Logger.DEBUG,this);
if(logDEBUG)
Core.logger.log(this, "Closing("+fromCloseThread+
") tcpConnection "+this,
@@ -484,13 +535,28 @@
if(finalized) return;
finalized = true;
logInstances("Finalizing");
- if(!closed) Core.logger.log(this, "finalized without being closed!"+this,
Logger.NORMAL);
+ if(!closed) Core.logger.log(this, "finalized without being closed!"+this,
+ Logger.NORMAL);
+ // Accumulator will not be reused after closure
+ if(poolBuffers) {
+ if(accumulator != null) {
+ synchronized(bufferPool) {
+ bufferPool.addLast(accumulator);
+ accumulator = null;
+ }
+ } else {
+ if(logDEBUG)
+ Core.logger.log(this, "Not repooling accumulator because is null:
"+
+ this, Logger.DEBUG);
+ }
+ }
try {
close(true);
} catch (Throwable t) {
Core.logger.log(this, "Caught "+t+" closing "+this+" in finalize()", t,
Logger.NORMAL);
}
- if(!reallyClosed) Core.logger.log(this, "finalized without being
reallyClosed!: "+this, Logger.NORMAL);
+ if(!reallyClosed) Core.logger.log(this, "finalized without being
reallyClosed!: "+this,
+ Logger.NORMAL);
//profiling
//WARNING:remove before release
synchronized(profLock) {
@@ -503,11 +569,13 @@
}
public final InputStream getIn() {
- return in;
+ if(closed) return null;
+ else return in;
}
public final NIOInputStream getUnderlyingIn() {
- return nioin;
+ if(closed) return null;
+ else return nioin;
}
public final OutputStream getOut() {
@@ -520,7 +588,7 @@
sock.setSoTimeout(timeout);
else
throw new IOException("Already closed "+this);
- if(Core.logger.shouldLog(Logger.DEBUG))
+ if(Core.logger.shouldLog(Logger.DEBUG,this))
Core.logger.log(this, "Set SO_TIMEOUT to "+timeout+" for "+this,
Logger.DEBUG);
}
@@ -580,10 +648,18 @@
sb.append("CLOSED"); //you won't believe it till you see it with
} else if (sock!=null) {
InetAddress addr = sock.getInetAddress();
+ int localPort = sock.getLocalPort();
+ if (localPort != Node.listenPort) {
+ sb.append(localPort);
+ sb.append(">");
+ }
if(addr != null) //this is becoming increasingly common --zab
sb.append(addr.getHostAddress());
sb.append(":");
sb.append(sock.getPort());
+ if (localPort == Node.listenPort) {
+ sb.append(">local");
+ }
} else sb.append("null");
sb.append(","+super.toString());
return sb.toString();
Index: tcpTransport.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/transport/tcpTransport.java,v
retrieving revision 1.1.1.1.4.4.2.2
retrieving revision 1.1.1.1.4.4.2.3
diff -u -w -r1.1.1.1.4.4.2.2 -r1.1.1.1.4.4.2.3
--- tcpTransport.java 15 Jul 2003 14:29:40 -0000 1.1.1.1.4.4.2.2
+++ tcpTransport.java 28 Oct 2003 20:20:48 -0000 1.1.1.1.4.4.2.3
@@ -56,7 +56,7 @@
public static boolean checkAddress(int[] i) {
// ip address (IPV6 is not supported by this transport)
- boolean logDEBUG = Core.logger.shouldLog(Logger.DEBUG);
+ boolean logDEBUG = Core.logger.shouldLog(Logger.DEBUG,tcpTransport.class);
if(logDEBUG)
Core.logger.log(tcpTransport.class, "Checking "+i[0]+"."+i[1]+"."+i[2]+
"."+i[3], Logger.DEBUG);
@@ -101,7 +101,7 @@
}
public boolean checkAddress(String s, boolean noPort) {
- boolean logDEBUG = Core.logger.shouldLog(Logger.DEBUG);
+ boolean logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
if(logDEBUG)
Core.logger.log(this, "Checking "+s, Logger.DEBUG);
String a = s;
_______________________________________________
cvs mailing list
[EMAIL PROTECTED]
http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs