Author: nextgens
Date: 2006-07-12 13:59:29 +0000 (Wed, 12 Jul 2006)
New Revision: 9573
Modified:
trunk/freenet/src/freenet/clients/http/N2NTMToadlet.java
trunk/freenet/src/freenet/io/comm/DummyPeerContext.java
trunk/freenet/src/freenet/io/comm/PeerContext.java
trunk/freenet/src/freenet/node/CHKInsertSender.java
trunk/freenet/src/freenet/node/FNPPacketMangler.java
trunk/freenet/src/freenet/node/LocationManager.java
trunk/freenet/src/freenet/node/NodeDispatcher.java
trunk/freenet/src/freenet/node/PacketSender.java
trunk/freenet/src/freenet/node/PeerManager.java
trunk/freenet/src/freenet/node/PeerNode.java
Log:
Working n2n messenging support ... hopefully it won't cause any connectivity
problem.
Modified: trunk/freenet/src/freenet/clients/http/N2NTMToadlet.java
===================================================================
--- trunk/freenet/src/freenet/clients/http/N2NTMToadlet.java 2006-07-12
00:50:28 UTC (rev 9572)
+++ trunk/freenet/src/freenet/clients/http/N2NTMToadlet.java 2006-07-12
13:59:29 UTC (rev 9573)
@@ -164,7 +164,7 @@
buf.append("PeerNode.hashCode
'"+request.getParam("hashcode")+"' not found.<br /><br />\n");
buf.append("</div>");
buf.append("</div>");
- } else if(!pn.isConnected() &&
!pn.isVerifiedIncompatibleNewerVersion() &&
!pn.isVerifiedIncompatibleOlderVersion()) {
+ } else if(!pn.isConnected()) {
ctx.getPageMaker().makeHead(buf, "Node To
Node Text Message Failed");
buf.append("<div class=\"infobox
infobox-error\">");
Modified: trunk/freenet/src/freenet/io/comm/DummyPeerContext.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/DummyPeerContext.java 2006-07-12
00:50:28 UTC (rev 9572)
+++ trunk/freenet/src/freenet/io/comm/DummyPeerContext.java 2006-07-12
13:59:29 UTC (rev 9573)
@@ -22,8 +22,11 @@
// Do nothing
}
+ public boolean isReallyConnected() {
+ return false;
+ }
+
public boolean isConnected() {
return false;
}
-
}
Modified: trunk/freenet/src/freenet/io/comm/PeerContext.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/PeerContext.java 2006-07-12 00:50:28 UTC
(rev 9572)
+++ trunk/freenet/src/freenet/io/comm/PeerContext.java 2006-07-12 13:59:29 UTC
(rev 9573)
@@ -13,6 +13,9 @@
/** Force the peer to disconnect */
void forceDisconnect();
- /** Is the peer connected? If we can't tell, return true. */
+ /** Is the peer connected? Have we established the session link? */
boolean isConnected();
+
+ /** Is the peer connected? are we able to route requests to it? */
+ boolean isReallyConnected();
}
Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertSender.java 2006-07-12 00:50:28 UTC
(rev 9572)
+++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2006-07-12 13:59:29 UTC
(rev 9573)
@@ -654,7 +654,7 @@
MessageFilter mf = null;
for(int i=0;i<waiters.length;i++) {
AwaitingCompletion awc = waiters[i];
- if(!awc.pn.isConnected()) {
+ if(!awc.pn.isReallyConnected()) {
Logger.normal(this, "Disconnected:
"+awc.pn+" in "+CHKInsertSender.this);
continue;
}
@@ -686,7 +686,7 @@
}
if(noTimeLeft) {
for(int
i=0;i<waiters.length;i++) {
-
if(!waiters[i].pn.isConnected()) continue;
+
if(!waiters[i].pn.isReallyConnected()) continue;
if(!waiters[i].completedTransfer) {
waiters[i].completedTransfer(false);
}
@@ -753,7 +753,7 @@
if(noTimeLeft) {
Logger.minor(this, "Overall
timeout on "+CHKInsertSender.this);
for(int
i=0;i<waiters.length;i++) {
-
if(!waiters[i].pn.isConnected()) continue;
+
if(!waiters[i].pn.isReallyConnected()) continue;
if(!waiters[i].receivedCompletionNotice)
waiters[i].completed(false, false);
if(!waiters[i].completedTransfer)
@@ -778,7 +778,7 @@
boolean completedTransfers = true;
synchronized(nodesWaitingForCompletion) {
for(int i=0;i<waiters.length;i++) {
- if(!waiters[i].pn.isConnected())
continue;
+ if(!waiters[i].pn.isReallyConnected())
continue;
if(!waiters[i].completedTransfer) {
completedTransfers = false;
break;
@@ -794,7 +794,7 @@
}
completedTransfers = true;
for(int
i=0;i<waiters.length;i++) {
-
if(!waiters[i].pn.isConnected()) continue;
+
if(!waiters[i].pn.isReallyConnected()) continue;
if(!waiters[i].completedTransfer) {
completedTransfers = false;
break;
Modified: trunk/freenet/src/freenet/node/FNPPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/FNPPacketMangler.java 2006-07-12
00:50:28 UTC (rev 9572)
+++ trunk/freenet/src/freenet/node/FNPPacketMangler.java 2006-07-12
13:59:29 UTC (rev 9573)
@@ -952,7 +952,7 @@
} catch (NotConnectedException e) {
Logger.normal(this, "Caught "+e+" while sending
messages, requeueing remaining messages");
// Requeue
- if(!dontRequeue)
+ if(pn.isReallyConnected() && !dontRequeue)
pn.requeueMessageItems(messages, lastIndex,
messages.length - lastIndex, false, "NotConnectedException(3)");
return;
} catch (WouldBlockException e) {
@@ -1019,7 +1019,13 @@
throw new IllegalArgumentException();
PeerNode pn = (PeerNode)peer;
byte[] newBuf = preformat(buf, offset, length);
+ try{
processOutgoingPreformatted(newBuf, 0, newBuf.length, pn, -1, null,
alreadyReportedBytes);
+ }catch (NotConnectedException e){
+ pn.invalidate();
+ pn.setPeerNodeStatus(System.currentTimeMillis());
+ throw e;
+ }
}
Modified: trunk/freenet/src/freenet/node/LocationManager.java
===================================================================
--- trunk/freenet/src/freenet/node/LocationManager.java 2006-07-12 00:50:28 UTC
(rev 9572)
+++ trunk/freenet/src/freenet/node/LocationManager.java 2006-07-12 13:59:29 UTC
(rev 9573)
@@ -124,7 +124,7 @@
PeerNode[] peers = node.peers.connectedPeers;
for(int i=0;i<peers.length;i++) {
PeerNode pn = peers[i];
- if(pn.isConnected()) {
+ if(pn.isReallyConnected()) {
double ploc =
pn.getLocation().getValue();
if(ploc == myLoc) {
myFlag = true;
@@ -386,7 +386,7 @@
}
if(reply == null) {
- if(pn.isConnected() && (System.currentTimeMillis() -
pn.timeLastConnected() > TIMEOUT*2)) {
+ if(pn.isReallyConnected() && (System.currentTimeMillis() -
pn.timeLastConnected() > TIMEOUT*2)) {
// Timed out! Abort...
Logger.error(this, "Timed out waiting for
SwapRejected/SwapReply on "+uid);
}
@@ -421,7 +421,7 @@
}
if(reply == null) {
- if(pn.isConnected() && (System.currentTimeMillis() -
pn.timeLastConnected() > TIMEOUT*2)) {
+ if(pn.isReallyConnected() && (System.currentTimeMillis() -
pn.timeLastConnected() > TIMEOUT*2)) {
// Hrrrm!
Logger.error(this, "Timed out waiting for SwapComplete
- malicious node?? on "+uid);
}
@@ -923,6 +923,7 @@
* We lost the connection to a node, or it was restarted.
*/
public void lostOrRestartedNode(PeerNode pn) {
+ if(!pn.isReallyConnected()) return;
Vector v = new Vector();
synchronized(recentlyForwardedIDs) {
Enumeration e = recentlyForwardedIDs.keys();
Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java 2006-07-12 00:50:28 UTC
(rev 9572)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java 2006-07-12 13:59:29 UTC
(rev 9573)
@@ -50,7 +50,32 @@
Logger.minor(this, "Lost connection replying to "+m);
}
return true;
+ }else if(spec == DMT.FNPLinkPing) {
+ long id = m.getLong(DMT.PING_SEQNO);
+ Message msg = DMT.createFNPLinkPong(id);
+ try {
+ source.sendAsync(msg, null, 0, null);
+ } catch (NotConnectedException e) {
+ // Ignore
+ }
+ return true;
+ } else if(spec == DMT.FNPLinkPong) {
+ long id = m.getLong(DMT.PING_SEQNO);
+ source.receivedLinkPong(id);
+ return true;
+ } else if(spec == DMT.FNPDetectedIPAddress) {
+ Peer p = (Peer) m.getObject(DMT.EXTERNAL_ADDRESS);
+ source.setRemoteDetectedPeer(p);
+ node.redetectAddress();
+ } else if(spec == DMT.FNPVoid) {
+ return true;
+ } else if(spec == DMT.nodeToNodeTextMessage) {
+ node.receivedNodeToNodeTextMessage(m);
+ return true;
}
+
+ if(!source.isReallyConnected()) return false;
+
if(spec == DMT.FNPLocChangeNotification) {
double newLoc = m.getDouble(DMT.LOCATION);
source.updateLocation(newLoc);
@@ -79,29 +104,7 @@
return handleInsertRequest(m, false);
} else if(spec == DMT.FNPSSKInsertRequest) {
return handleInsertRequest(m, true);
- } else if(spec == DMT.FNPLinkPing) {
- long id = m.getLong(DMT.PING_SEQNO);
- Message msg = DMT.createFNPLinkPong(id);
- try {
- source.sendAsync(msg, null, 0, null);
- } catch (NotConnectedException e) {
- // Ignore
- }
- return true;
- } else if(spec == DMT.FNPLinkPong) {
- long id = m.getLong(DMT.PING_SEQNO);
- source.receivedLinkPong(id);
- return true;
- } else if(spec == DMT.FNPDetectedIPAddress) {
- Peer p = (Peer) m.getObject(DMT.EXTERNAL_ADDRESS);
- source.setRemoteDetectedPeer(p);
- node.redetectAddress();
- } else if(spec == DMT.FNPVoid) {
- return true;
- } else if(spec == DMT.nodeToNodeTextMessage) {
- node.receivedNodeToNodeTextMessage(m);
- return true;
- }
+ }
return false;
}
Modified: trunk/freenet/src/freenet/node/PacketSender.java
===================================================================
--- trunk/freenet/src/freenet/node/PacketSender.java 2006-07-12 00:50:28 UTC
(rev 9572)
+++ trunk/freenet/src/freenet/node/PacketSender.java 2006-07-12 13:59:29 UTC
(rev 9573)
@@ -153,8 +153,11 @@
Math.max(pn.lastReceivedPacketTime(),
lastReceivedPacketFromAnyNode);
if(pn.isConnected()) {
- if(pn.shouldDisconnectNow()) {
- pn.forceDisconnect();
+ if(pn.isReallyConnected() && pn.shouldDisconnectNow()) {
+ // we don't disconnect but we mark it incompatible
+ pn.invalidate();
+ pn.setPeerNodeStatus(now);
+ Logger.normal(this, "shouldDisconnectNow has returned
true : marking the peer as incompatible");
continue;
}
@@ -175,7 +178,7 @@
try {
pn.sendAnyUrgentNotifications();
} catch (PacketSequenceException e) {
- Logger.error(this, "Caught "+e+" - disconnecting", e);
+ Logger.error(this, "Caught "+e+" - while sending urgent
notifications : disconnecting", e);
pn.forceDisconnect();
}
} else {
Modified: trunk/freenet/src/freenet/node/PeerManager.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerManager.java 2006-07-12 00:50:28 UTC
(rev 9572)
+++ trunk/freenet/src/freenet/node/PeerManager.java 2006-07-12 13:59:29 UTC
(rev 9573)
@@ -133,7 +133,7 @@
// removing from connectedPeers
ArrayList a = new ArrayList();
for(int i=0;i<myPeers.length;i++) {
- if((myPeers[i]!=pn) && myPeers[i].isConnected())
+ if((myPeers[i]!=pn) && myPeers[i].isReallyConnected())
a.add(myPeers[i]);
}
@@ -168,7 +168,7 @@
// removing from connectedPeers
ArrayList a = new ArrayList();
for(int i=0;i<myPeers.length;i++) {
- if((myPeers[i]!=pn) && myPeers[i].isConnected())
+ if((myPeers[i]!=pn) &&
myPeers[i].isReallyConnected())
a.add(myPeers[i]);
}
PeerNode[] newConnectedPeers = new PeerNode[a.size()];
@@ -180,8 +180,8 @@
}
public void addConnectedPeer(PeerNode pn) {
- if(!pn.isConnected()) {
- Logger.minor(this, "Not connected: "+pn);
+ if(!pn.isReallyConnected()) {
+ Logger.minor(this, "Not ReallyConnected: "+pn);
return;
}
synchronized(this) {
@@ -272,7 +272,7 @@
locs = new double[connectedPeers.length];
int x = 0;
for(int i=0;i<conns.length;i++) {
- if(conns[i].isConnected())
+ if(conns[i].isReallyConnected())
locs[x++] = conns[i].getLocation().getValue();
}
// Wipe out any information contained in the order
@@ -294,7 +294,7 @@
for(int i=0;i<5;i++) {
PeerNode pn =
connectedPeers[node.random.nextInt(connectedPeers.length)];
if(pn == exclude) continue;
- if(pn.isConnected()) return pn;
+ if(pn.isReallyConnected()) return pn;
}
// None of them worked
// Move the un-connected ones out
@@ -304,14 +304,14 @@
for(int i=0;i<myPeers.length;i++) {
PeerNode pn = myPeers[i];
if(pn == exclude) continue;
- if(pn.isConnected()) {
+ if(pn.isReallyConnected()) {
v.add(pn);
} else {
Logger.minor(this, "Excluding "+pn+" because is disconnected");
}
}
int lengthWithoutExcluded = v.size();
- if((exclude != null) && exclude.isConnected())
+ if((exclude != null) && exclude.isReallyConnected())
v.add(exclude);
PeerNode[] newConnectedPeers = new PeerNode[v.size()];
newConnectedPeers = (PeerNode[]) v.toArray(newConnectedPeers);
@@ -327,7 +327,7 @@
public void localBroadcast(Message msg) {
PeerNode[] peers = connectedPeers; // avoid synchronization
for(int i=0;i<peers.length;i++) {
- if(peers[i].isConnected()) try {
+ if(peers[i].isReallyConnected()) try {
peers[i].sendAsync(msg, null, 0, null);
} catch (NotConnectedException e) {
// Ignore
@@ -348,7 +348,7 @@
PeerNode best = null;
for(int i=0;i<peers.length;i++) {
PeerNode p = peers[i];
- if(!p.isConnected()) continue;
+ if(!p.isReallyConnected()) continue;
double diff = distance(p, loc);
if(diff < bestDiff) {
best = p;
@@ -423,7 +423,7 @@
Logger.minor(this, "Skipping (req came from): "+p.getPeer());
continue;
}
- if(!p.isConnected()) {
+ if(!p.isReallyConnected()) {
Logger.minor(this, "Skipping (not connected): "+p.getPeer());
continue;
}
@@ -580,7 +580,7 @@
public boolean anyConnectedPeers() {
PeerNode[] conns = connectedPeers;
for(int i=0;i<conns.length;i++) {
- if(conns[i].isConnected()) return true;
+ if(conns[i].isReallyConnected()) return true;
}
return false;
}
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2006-07-12 00:50:28 UTC
(rev 9572)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2006-07-12 13:59:29 UTC
(rev 9573)
@@ -680,9 +680,13 @@
* Note possible deadlocks! PeerManager calls this, we call
* PeerManager in e.g. verified.
*/
- public boolean isConnected() {
+ public boolean isReallyConnected() {
return isConnected;
}
+
+ public boolean isConnected(){
+ return (isConnected) || (completedHandshake &&
(verifiedIncompatibleNewerVersion || verifiedIncompatibleOlderVersion));
+ }
/**
* Send a message, off-thread, to this node.
@@ -694,7 +698,7 @@
*/
public void sendAsync(Message msg, AsyncMessageCallback cb, int
alreadyReportedBytes, ByteCounter ctr) throws NotConnectedException {
Logger.minor(this, "Sending async: "+msg+" : "+cb+" on "+this);
- if(!isConnected) throw new NotConnectedException();
+ if(!isConnected()) throw new NotConnectedException();
MessageItem item = new MessageItem(msg, cb == null ? null : new
AsyncMessageCallback[] {cb}, alreadyReportedBytes, ctr);
synchronized(routingBackoffSync) {
reportBackoffStatus(System.currentTimeMillis());
@@ -732,6 +736,7 @@
synchronized(this) {
// Force renegotiation.
isConnected = false;
+ completedHandshake = false;
setPeerNodeStatus(now);
// Prevent sending packets to the node until that happens.
if(currentTracker != null)
@@ -829,7 +834,7 @@
*/
public boolean shouldSendHandshake() {
long now = System.currentTimeMillis();
- return (!isConnected) &&
+ return (!isConnected()) &&
(!isDisabled) && // don't connect to disabled
peers
(!isListenOnly) && // don't send handshake
requests to isListenOnly peers
(handshakeIPs != null) &&
@@ -1049,7 +1054,7 @@
* can be used in handshaking when the connection hasn't been verified yet.
*/
synchronized void receivedPacket(boolean dontLog) throws
NotConnectedException {
- if((isConnected == false) && !dontLog) {
+ if(!isConnected() && !dontLog) {
if((unverifiedTracker == null) && (currentTracker == null)) {
Logger.error(this, "Received packet while
disconnected!: "+this, new Exception("error"));
throw new NotConnectedException();
@@ -1113,31 +1118,25 @@
Logger.normal(this, "Not connecting to "+this+" -
reverse invalid version "+Version.getVersionString()+" for peer's
lastGoodversion: "+lastGoodVersion);
verifiedIncompatibleNewerVersion = true;
isConnected = false;
- setPeerNodeStatus(now);
node.peers.disconnected(this);
- return false;
} else {
verifiedIncompatibleNewerVersion = false;
- setPeerNodeStatus(now);
}
if(invalidVersion()) {
Logger.normal(this, "Not connecting to "+this+" -
invalid version "+version);
verifiedIncompatibleOlderVersion = true;
isConnected = false;
- setPeerNodeStatus(now);
node.peers.disconnected(this);
- return false;
} else {
verifiedIncompatibleOlderVersion = false;
- setPeerNodeStatus(now);
}
+
+ setPeerNodeStatus(now);
KeyTracker newTracker = new KeyTracker(this, encCipher, encKey);
changedIP(replyTo);
if(thisBootID != this.bootID) {
connectedTime = System.currentTimeMillis();
Logger.minor(this, "Changed boot ID from "+bootID+" to
"+thisBootID+" for "+getPeer());
- isConnected = false; // Will be reset below
- setPeerNodeStatus(now);
if(previousTracker != null) {
KeyTracker old = previousTracker;
previousTracker = null;
@@ -1164,7 +1163,6 @@
unverifiedTracker = null;
if(previousTracker != null)
previousTracker.deprecated();
- isConnected = true;
neverConnected = false;
peerAddedTime = 0; // don't store anymore
setPeerNodeStatus(now);
@@ -1196,7 +1194,8 @@
else return;
if(unverifiedTracker != null) return;
}
- sendInitialMessages();
+ if(isReallyConnected())
+ sendInitialMessages();
}
/**
@@ -1207,7 +1206,7 @@
Message ipMsg = DMT.createFNPDetectedIPAddress(detectedPeer);
try {
- sendAsync(locMsg, null, 0, null);
+ sendAsync(locMsg, null, 0, null);
sendAsync(ipMsg, null, 0, null);
} catch (NotConnectedException e) {
Logger.error(this, "Completed handshake with "+getPeer()+" but
disconnected!!!", new Exception("error"));
@@ -1461,8 +1460,15 @@
}
public String getStatus() {
+ String status;
+ if(isConnected)
+ status = new String("CONNECTED");
+ else if (isConnected())
+ status = new String("INCOMPATIBLE");
+ else
+ status = new String("DISCONNECTED");
return
- (isConnected ? "CONNECTED " : "DISCONNECTED") + " " +
getPeer()+" "+myName+" "+currentLocation.getValue()+" "+getVersion()+" backoff:
"+routingBackoffLength+" ("+(Math.max(routingBackedOffUntil -
System.currentTimeMillis(),0))+")";
+ status + " " + getPeer()+" "+myName+"
"+currentLocation.getValue()+" "+getVersion()+" backoff:
"+routingBackoffLength+" ("+(Math.max(routingBackedOffUntil -
System.currentTimeMillis(),0))+")";
}
public String getTMCIPeerInfo() {
@@ -1772,6 +1778,7 @@
public void receivedLinkPong(long id) {
Long lid = new Long(id);
long startTime;
+ long now = System.currentTimeMillis();
synchronized(pingSync) {
Long s = (Long) pingsSentTimes.get(lid);
if(s == null) {
@@ -1780,12 +1787,13 @@
}
startTime = s.longValue();
pingsSentTimes.removeKey(lid);
- long now = System.currentTimeMillis();
pingAverage.report(now - startTime);
Logger.minor(this, "Reporting ping time to "+this+" :
"+(now - startTime));
}
- if(verifiedIncompatibleOlderVersion)
- forceDisconnect();
+ if(!isReallyConnected()){
+ invalidate();
+ setPeerNodeStatus(now);
+ }
}
public double averagePingTime() {
@@ -2063,9 +2071,16 @@
*/
public synchronized boolean shouldDisconnectNow() {
verifiedIncompatibleOlderVersion = invalidVersion();
- return verifiedIncompatibleOlderVersion;
+ verifiedIncompatibleNewerVersion = reverseInvalidVersion();
+ if(verifiedIncompatibleNewerVersion ||
verifiedIncompatibleOlderVersion) return true;
+ return false;
}
+ protected synchronized void invalidate(){
+ isConnected = false;
+ Logger.normal(this, "Invalidated "+this);
+ }
+
public boolean allowLocalAddresses() {
return allowLocalAddresses;
}