Update of /cvsroot/freenet/freenet/src/freenet/interfaces
In directory sc8-pr-cvs1:/tmp/cvs-serv25857/src/freenet/interfaces
Modified Files:
Tag: stable
BaseLocalNIOInterface.java FreenetConnectionRunner.java
Interface.java LocalHTTPInterface.java LocalInterface.java
LocalNIOInterface.java
Log Message:
5029: Merge from unstable after months of work. MASSIVE changes.
Highlights:
* Next Generation Routing, massive related changes
* Major changes to handling of messages and connections (PeerHandler and related
changes)
* Even more non-blocking I/O
* Documentation improvements
* Lots of new diagnostics and config options
* Lots of bug fixes and performance tweaking
* Probably lots of new bugs too!
Index: BaseLocalNIOInterface.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/interfaces/BaseLocalNIOInterface.java,v
retrieving revision 1.2.2.3
retrieving revision 1.2.2.4
diff -u -w -r1.2.2.3 -r1.2.2.4
--- BaseLocalNIOInterface.java 23 Jul 2003 16:21:13 -0000 1.2.2.3
+++ BaseLocalNIOInterface.java 28 Oct 2003 20:20:32 -0000 1.2.2.4
@@ -15,7 +15,8 @@
*/
public abstract class BaseLocalNIOInterface extends NIOInterface {
- protected int[][] allowedHosts;
+ private int[][] allowedHosts;
+ boolean logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
private static final int intAddress(String addr) {
try {
@@ -26,18 +27,39 @@
}
private static final int intAddress(InetAddress addr) {
- boolean logDEBUG = Core.logger.shouldLog(Logger.DEBUG);
- if(logDEBUG) Core.logger.log(LocalInterface.class, "intAddress("+
- addr.toString()+")", Logger.DEBUG);
+ boolean logDEBUG =
+ Core.logger.shouldLog(Logger.DEBUG,
BaseLocalNIOInterface.class);
+ if (logDEBUG)
+ Core.logger.log(
+ LocalNIOInterface.class,
+ "intAddress(" + addr.toString() + ")",
+ Logger.DEBUG);
byte[] b = addr.getAddress();
- if(logDEBUG) Core.logger.log(LocalInterface.class, "Address: "+(((int)b[0]) &
0xff) +
- "."+(((int)b[1]) & 0xff)+"."+(((int)b[2]) &
0xff)+"."+
- (((int)b[3]) & 0xff)+" ("+
- b.length+")", Logger.DEBUG);
- long x = ((((long)b[0]) & 0xff) << 24) + ((((int)b[1]) & 0xff) << 16) +
- ((((int)b[2]) & 0xff) << 8) + (((int)b[3]) & 0xff);
- if(logDEBUG) Core.logger.log(LocalInterface.class, "Returning "+
- Fields.longToHex(x), Logger.DEBUG);
+ if (logDEBUG)
+ Core.logger.log(
+ LocalNIOInterface.class,
+ "Address: "
+ + (((int) b[0]) & 0xff)
+ + "."
+ + (((int) b[1]) & 0xff)
+ + "."
+ + (((int) b[2]) & 0xff)
+ + "."
+ + (((int) b[3]) & 0xff)
+ + " ("
+ + b.length
+ + ")",
+ Logger.DEBUG);
+ long x =
+ ((((long) b[0]) & 0xff) << 24)
+ + ((((int) b[1]) & 0xff) << 16)
+ + ((((int) b[2]) & 0xff) << 8)
+ + (((int) b[3]) & 0xff);
+ if (logDEBUG)
+ Core.logger.log(
+ LocalNIOInterface.class,
+ "Returning " + Fields.longToHex(x),
+ Logger.DEBUG);
return (int) x;
}
@@ -55,7 +77,6 @@
return out;
}
- int runningConnections = 0; // number of connections running
int lowRunningConnections; // reenable interface when go below this
int highRunningConnections; // disable interface when go above this
@@ -103,6 +124,17 @@
int highRunningConnections)
throws ListenException {
super(listenAddr);
+ setAllowedHosts(allowedHosts);
+ this.lowRunningConnections = lowRunningConnections;
+ this.highRunningConnections = highRunningConnections;
+ this.listener = getListener(listenAddr);
+ }
+
+ /**
+ * @param allowedHosts set of Addresses to do an equalsHost() check with;
+ * null means to allow all hosts
+ */
+ public void setAllowedHosts(String allowedHosts){
int[][] allowedHostsAddr = null;
if (allowedHosts == null || allowedHosts.trim().equals("")) {
@@ -113,7 +145,7 @@
allowedHosts = "0.0.0.0/0";
}
- if(Core.logger.shouldLog(Logger.DEBUG))
+ if(logDEBUG)
Core.logger.log(this, "New BaseLocalNIOInterface: "+listenAddr+
","+allowedHosts, Logger.DEBUG);
@@ -122,44 +154,43 @@
allowedHostsAddr = new int [hosts.length][2];
for (int i = 0; i < hosts.length; ++i) {
- int host, subnet, div = hosts[i].indexOf('/');
- if (div == -1) {
+ hostAndSubnetPair h = parseHostOrNetString(hosts[i]);
+ allowedHostsAddr[i][0] = mask(h.host, h.subnet);
+ allowedHostsAddr[i][1] = h.subnet;
+ }
+ this.allowedHosts = allowedHostsAddr;
+ }
+ private class hostAndSubnetPair{
+ public int host,subnet;
+ hostAndSubnetPair(int host, int subnet){
+ this.host = host;
+ this.subnet = subnet;
+ }
+ }
+ private hostAndSubnetPair parseHostOrNetString(String hostOrNetString){
+ int host, subnet, div = hostOrNetString.indexOf('/');
+ if (div == -1) { //Consider the absence of a subnetmask as
255.255.255.255 (=only the exact host specified)
subnet = 32;
- host = intAddress(hosts[i]);
+ host = intAddress(hostOrNetString);
} else {
- subnet = Integer.parseInt(hosts[i].substring(div+1));
- host = intAddress(hosts[i].substring(0,div));
+ subnet = Integer.parseInt(hostOrNetString.substring(div+1));
+ host = intAddress(hostOrNetString.substring(0,div));
}
- allowedHostsAddr[i][0] = mask(host, subnet);
- allowedHostsAddr[i][1] = subnet;
+ return new hostAndSubnetPair(host,subnet);
}
- this.allowedHosts = allowedHostsAddr;
- this.lowRunningConnections = lowRunningConnections;
- this.highRunningConnections = highRunningConnections;
- this.listener = getListener(listenAddr);
- }
-
- /**
- * Description of the Method
- *
- * @param conn Description of the Parameter
- * @exception RejectedConnectionException Description of the Exception
- */
- protected void dispatch(Connection conn) throws RejectedConnectionException {
+ private boolean hostAllowed(Address addr)
+ throws RejectedConnectionException {
boolean allow = false;
- Address ha = conn.getPeerAddress();
- if(Core.logger.shouldLog(Core.logger.DEBUG))
- Core.logger.log(this,
- "Dispatching connection on a BaseLocalNIOInterface from " +
- ha.toString(), Core.logger.DEBUG);
//insert some code to make sure ha is a tcpAddress and
//handle things correctly when it's not. --thelema
int inta;
try {
- inta = intAddress(((tcpAddress) ha).getHost());
+ inta = intAddress(((tcpAddress) addr).getHost());
} catch (java.net.UnknownHostException e) {
- Core.logger.log(this, "Unknown Host on incoming connection!!",
+ Core.logger.log(
+ this,
+ "Unknown Host on incoming connection!!",
Core.logger.ERROR);
throw new RejectedConnectionException("unknown host on incoming
connection!");
}
@@ -168,18 +199,39 @@
int subnet = allowedHosts[i][0];
int maskbits = allowedHosts[i][1];
allow |= (mask(inta, maskbits) == subnet);
- if(Core.logger.shouldLog(Core.logger.DEBUG))
- Core.logger.log(this, "Trying "+Fields.intToHex(subnet)+
- ":"+Fields.intToHex(maskbits)+" for "+
- Fields.intToHex(inta), Core.logger.DEBUG);
+ if (logDEBUG)
+ Core.logger.log(
+ this,
+ "Trying "
+ + Fields.intToHex(subnet)
+ + ":"
+ + Fields.intToHex(maskbits)
+ + " for "
+ + Fields.intToHex(inta),
+ Core.logger.DEBUG);
}
+ return allow;
+ }
+
- if (allow) {
+ /**
+ * Description of the Method
+ *
+ * @param conn Description of the Parameter
+ * @exception RejectedConnectionException Thrown when connections from the
connecting host isn't allowed
+ */
+ protected void dispatch(Connection conn) throws RejectedConnectionException {
+ logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
+ Address ha = conn.getPeerAddress();
+ if(logDEBUG)
+ Core.logger.log(this,
+ "Dispatching connection on a BaseLocalNIOInterface
from " +
+ ha.toString(), Core.logger.DEBUG);
+ if ( hostAllowed(ha)) {
handleConnection(conn);
} else {
- if(Core.logger.shouldLog(Core.logger.DEBUG))
- Core.logger.log(this, "Rejecting local connection",
- Core.logger.DEBUG);
+ if(logDEBUG)
+ Core.logger.log(this, "Rejecting local connection",
Core.logger.DEBUG);
throw new RejectedConnectionException("host not allowed: " + ha);
}
}
Index: FreenetConnectionRunner.java
===================================================================
RCS file:
/cvsroot/freenet/freenet/src/freenet/interfaces/FreenetConnectionRunner.java,v
retrieving revision 1.2.4.2.2.6
retrieving revision 1.2.4.2.2.7
diff -u -w -r1.2.4.2.2.6 -r1.2.4.2.2.7
--- FreenetConnectionRunner.java 16 Aug 2003 17:30:06 -0000 1.2.4.2.2.6
+++ FreenetConnectionRunner.java 28 Oct 2003 20:20:32 -0000 1.2.4.2.2.7
@@ -43,7 +43,7 @@
public void handle(Connection conn) {
ConnectionHandler ch = null;
- boolean logDEBUG = core.logger.shouldLog(Logger.DEBUG);
+ boolean logDEBUG = core.logger.shouldLog(Logger.DEBUG,this);
boolean success = false;
long startMs = System.currentTimeMillis();
@@ -96,8 +96,9 @@
}
NIOInputStream niois = (NIOInputStream)
((tcpConnection)conn).getUnderlyingIn();
niois.setNextReader(ch);
- if(logDEBUG) Core.logger.log(this, "NIOIS was "+niois+" for "+conn+
- " (to be "+ch+")", Logger.DEBUG);
+ if(logDEBUG) Core.logger.log(this, "NIOIS was "+niois+" for "+
+ conn+" (to be "+ch+")",
+ Logger.DEBUG);
tcpConnection.getRSL().unregister(niois);
while(niois.isRegistered() && (!niois.alreadyClosedLink())) {
synchronized(niois.unregLock) {
@@ -130,6 +131,8 @@
// Some done by niois.unregister
// ch.configRSL(tcpConnection.getRSL());
ch.configWSL(tcpConnection.getWSL());
+ if(!ch.isOpen())
+ throw new IOException("Conn closed after setting WSL: "+ch);
ch.registerOCM();
// tcpConnection.getRSL().register(sock, ch);
// tcpConnection.getRSL().scheduleMaintenance(ch);
Index: Interface.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/interfaces/Interface.java,v
retrieving revision 1.3.4.3.2.6
retrieving revision 1.3.4.3.2.7
diff -u -w -r1.3.4.3.2.6 -r1.3.4.3.2.7
--- Interface.java 4 Jul 2003 02:45:08 -0000 1.3.4.3.2.6
+++ Interface.java 28 Oct 2003 20:20:32 -0000 1.3.4.3.2.7
@@ -110,7 +110,7 @@
*/
public synchronized final void listen(boolean listen) {
if (listen) {
- if(Core.logger.shouldLog(Logger.DEBUG))
+ if(Core.logger.shouldLog(Logger.DEBUG,this))
Core.logger.log(this, "Starting listening "+this,
new Exception("debug"),
wasStopped ? Logger.NORMAL : Logger.DEBUG);
@@ -153,7 +153,7 @@
executor = Thread.currentThread();
while (!terminated) {
if (!listening) {
- boolean logDEBUG = Core.logger.shouldLog(Logger.DEBUG);
+ boolean logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
synchronized (this) {
while (!listening && !terminated) {
try {
@@ -202,7 +202,7 @@
protected abstract void starting();
private void acceptConnections() {
- if(Core.logger.shouldLog(Logger.DEBUG))
+ if(Core.logger.shouldLog(Logger.DEBUG,this))
Core.logger.log(this, "Accepting connections", Logger.DEBUG);
while (listening && !terminated) {
try {
Index: LocalHTTPInterface.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/interfaces/LocalHTTPInterface.java,v
retrieving revision 1.3.2.3
retrieving revision 1.3.2.4
diff -u -w -r1.3.2.3 -r1.3.2.4
--- LocalHTTPInterface.java 5 Aug 2003 19:41:28 -0000 1.3.2.3
+++ LocalHTTPInterface.java 28 Oct 2003 20:20:32 -0000 1.3.2.4
@@ -30,14 +30,10 @@
public class LocalHTTPInterface extends BaseLocalNIOInterface {
- public final ReadSelectorLoop rsl;
- public final WriteSelectorLoop wsl;
public final MultipleHttpServletContainer container;
public final SessionHolder sh = new SessionHolderImpl();
public LocalHTTPInterface(ListeningAddress listenAddr,
- ReadSelectorLoop rsl,
- WriteSelectorLoop wsl,
MultipleHttpServletContainer
container,
String allowedHosts,
int lowRunningConnections,
@@ -45,8 +41,6 @@
throws ListenException {
super(listenAddr, allowedHosts, lowRunningConnections,
highRunningConnections);
- this.rsl = rsl;
- this.wsl = wsl;
this.container = container;
}
@@ -71,7 +65,7 @@
SocketChannel sc = sock.getChannel();
Core.logger.log(this, "Got channel: "+sc, Logger.DEBUG);
HTTPCallback hc = new HTTPCallback(sc, conn);
- hc.configWSL(wsl);
+ hc.configWSL(tcpConnection.getWSL());
niois.setNextReader(hc);
tcpConnection.getRSL().unregister(niois);
Core.logger.log(this, "Almost registered channel", Logger.DEBUG);
@@ -220,6 +214,7 @@
Core.logger.log(this, "Sending packet of length "+buf.length,
Logger.DEBUG);
try {
+ WriteSelectorLoop wsl = tcpConnection.getWSL();
if(!wsl.send(buf, sc, this,wsl.MESSAGE)) {
Core.logger.log(this, "Failed send",
Logger.DEBUG);
sending.add(buf);
@@ -241,6 +236,7 @@
if(sending.size() > 0) {
byte[] buf = (byte[])(sending.remove(0));
try {
+ WriteSelectorLoop wsl =
tcpConnection.getWSL();
if(!wsl.send(buf, sc,
this,wsl.MESSAGE)) {
Core.logger.log(this, "Could
not send data in jobDone handler!", Logger.ERROR);
}
Core.logger.log(this, "Sent some data in jobDone",
@@ -250,7 +246,7 @@
Logger.ERROR);
}
} else if (closeAfterSent) {
- rsl.queueClose(sc);
+ tcpConnection.getRSL().queueClose(sc);
// WriteSelectorLoop doesn't know about it
unless we are actually sending data
}
}
Index: LocalInterface.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/interfaces/LocalInterface.java,v
retrieving revision 1.7.2.8.2.3
retrieving revision 1.7.2.8.2.4
diff -u -w -r1.7.2.8.2.3 -r1.7.2.8.2.4
--- LocalInterface.java 29 Apr 2003 00:54:14 -0000 1.7.2.8.2.3
+++ LocalInterface.java 28 Oct 2003 20:20:32 -0000 1.7.2.8.2.4
@@ -1,3 +1,4 @@
+/* -*- Mode: java; c-basic-indent: 4; tab-width: 4 -*- */
package freenet.interfaces;
import freenet.*;
@@ -28,7 +29,7 @@
}
private static final int intAddress(InetAddress addr) {
- boolean logDEBUG = Core.logger.shouldLog(Logger.DEBUG);
+ boolean logDEBUG =
Core.logger.shouldLog(Logger.DEBUG,LocalNIOInterface.class);
if(logDEBUG) Core.logger.log(LocalInterface.class, "intAddress("+
addr.toString()+")", Logger.DEBUG);
byte[] b = addr.getAddress();
@@ -222,7 +223,7 @@
protected void dispatch(Connection conn) throws RejectedConnectionException {
boolean allow = false;
Address ha = conn.getPeerAddress();
- boolean logDEBUG = Core.logger.shouldLog(Logger.DEBUG);
+ boolean logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
if(logDEBUG) Core.logger.log(this,
"Dispatching connection on a LocalInterface from
" +
ha.toString(), Logger.DEBUG);
@@ -248,9 +249,12 @@
if (allow) {
Thread t = tf.getThread(new ConnectionShell(conn));
-
- if(logDEBUG) Core.logger.log(this, "Allocated thread for local connection:
"+
- t, Logger.DEBUG);
+ if(logDEBUG) {
+ String tname = "";
+ if (t != null) tname = t.toString();
+ Core.logger.log(this, "Allocated thread for local
connection: "+
+ tname, Logger.DEBUG);
+ }
} else {
if(logDEBUG) Core.logger.log(this, "Rejecting local connection",
Logger.DEBUG);
@@ -275,7 +279,7 @@
}
if((highRunningConnections > 0) && (runningConnections >
highRunningConnections) && isListening()) listen(false);
- if(Core.logger.shouldLog(Logger.DEBUG))
+ if(Core.logger.shouldLog(Logger.DEBUG,this))
Core.logger.log(this, "RunningConnections now "+
runningConnections+", listening = "+
isListening(), Logger.DEBUG);
@@ -318,7 +322,7 @@
&& !isListening()) {
listen(true);
}
- if(Core.logger.shouldLog(Logger.DEBUG))
+ if(Core.logger.shouldLog(Logger.DEBUG,this))
Core.logger.log(this, "RunningConnections now "+
runningConnections+", listening = "+
isListening(), Logger.DEBUG);
Index: LocalNIOInterface.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/interfaces/LocalNIOInterface.java,v
retrieving revision 1.3.2.1
retrieving revision 1.3.2.2
diff -u -w -r1.3.2.1 -r1.3.2.2
--- LocalNIOInterface.java 1 Jul 2003 02:27:14 -0000 1.3.2.1
+++ LocalNIOInterface.java 28 Oct 2003 20:20:33 -0000 1.3.2.2
@@ -1,3 +1,4 @@
+/* -*- Mode: java; c-basic-indent: 4; tab-width: 4 -*- */
package freenet.interfaces;
import freenet.thread.ThreadFactory;
import freenet.transport.tcpConnection;
@@ -148,14 +149,25 @@
protected void handleConnection(Connection conn) {
if(acceptingConnections) {
+ if(logDEBUG)
+ Core.logger.log(this, "Accepting connection
immediately: "+
+ conn, Logger.DEBUG);
+ if(conn != null)
realHandleConnection(conn);
synchronized(oldConnections) {
+ int i=0;
while(!oldConnections.isEmpty() && acceptingConnections) {
+ if(logDEBUG)
+ Core.logger.log(this, "Handling old
connection #"+i++,
+
Logger.DEBUG);
realHandleConnection((Connection)oldConnections.
removeFirst());
}
}
} else {
+ if(logDEBUG)
+ Core.logger.log(this, "Deferring connection "+conn,
+ Logger.DEBUG);
synchronized(oldConnections) {
int deleted = 0;
while(oldConnections.size() >= MAX_QUEUED_CONNECTIONS) {
@@ -165,7 +177,11 @@
if(deleted > 0)
Core.logger.log(this, "Dropped "+deleted+" old connections",
Logger.NORMAL);
+ if(conn != null)
oldConnections.addFirst(conn);
+ if(logDEBUG)
+ Core.logger.log(this, "Added "+conn+" -
"+oldConnections.size()+
+ " connections
queued", Logger.DEBUG);
}
}
}
@@ -173,9 +189,14 @@
protected void realHandleConnection(Connection conn) {
if(runner.needsThread()) {
Thread t = tf.getThread(new ConnectionShell(conn));
+ if(logDEBUG) {
+ String tname = "";
+ if (t != null) tname = t.toString();
Core.logger.log(this, "Allocated thread for local connection: "+
- t+":"+conn, Core.logger.DEBUG);
+ tname+":"+conn,
Core.logger.DEBUG);
+ }
} else {
+ if(logDEBUG)
Core.logger.log(this, "Running local connection in-thread: "+
conn, Logger.DEBUG);
try {
@@ -204,6 +225,8 @@
LinkedList oldConnections = new LinkedList();
volatile boolean acceptingConnections = true;
+ int runningConnections = 0; // number of connections running
+
protected class ConnectionShell implements Runnable {
protected final Connection conn;
@@ -211,7 +234,7 @@
protected ConnectionShell(Connection conn) {
this.conn = conn;
- synchronized(this) {
+ synchronized(LocalNIOInterface.this) {
uppedRC = true;
runningConnections++;
}
@@ -222,19 +245,15 @@
this, Logger.MINOR);
acceptingConnections = false;
}
- if(Core.logger.shouldLog(Core.logger.DEBUG))
+ if(logDEBUG)
Core.logger.log(this, "RunningConnections now "+
runningConnections+", listening = "+
isListening(), Core.logger.DEBUG);
}
protected void finalize() {
- synchronized(this) {
- if(uppedRC == true) {
- runningConnections--;
- uppedRC = false;
- }
- }
+ decrementRunningConnections();
+ // Successful return from handle() means it will eventually be
closed by handler
}
/** Main processing method for the ConnectionShell object */
@@ -255,21 +274,28 @@
e.printStackTrace(Core.logStream);
conn.close();
} finally {
- synchronized(this) {
+ decrementRunningConnections();
+ }
+ }
+
+ protected void decrementRunningConnections() {
+ synchronized(LocalNIOInterface.this) {
+ if(uppedRC == true) {
runningConnections--;
uppedRC = false;
- }
+ if(logDEBUG)
+ Core.logger.log(this,
"RunningConnections now "+
+
runningConnections+", listening = "+
+
isListening(), Core.logger.DEBUG);
if(runningConnections < lowRunningConnections
&& !acceptingConnections) {
Core.logger.log(this, "Restarting processing connections "+
this, Logger.MINOR);
acceptingConnections = true;
+ } else return;
+ } else return;
}
- if(Core.logger.shouldLog(Core.logger.DEBUG))
- Core.logger.log(this, "RunningConnections now "+
- runningConnections+", listening = "+
- isListening(), Core.logger.DEBUG);
- }
+ handleConnection(null);
}
}
}
_______________________________________________
cvs mailing list
[EMAIL PROTECTED]
http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs