Author: toad
Date: 2007-06-27 21:00:20 +0000 (Wed, 27 Jun 2007)
New Revision: 13784
Added:
trunk/freenet/src/freenet/io/comm/MessageCore.java
trunk/freenet/src/freenet/io/comm/PacketSocketHandler.java
trunk/freenet/src/freenet/io/comm/SocketHandler.java
trunk/freenet/src/freenet/io/comm/UdpSocketHandler.java
Removed:
trunk/freenet/src/freenet/io/comm/UdpSocketManager.java
Modified:
trunk/freenet/src/freenet/clients/http/N2NTMToadlet.java
trunk/freenet/src/freenet/io/comm/MessageFilter.java
trunk/freenet/src/freenet/io/comm/PeerContext.java
trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
trunk/freenet/src/freenet/node/FNPPacketMangler.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/NodeIPDetector.java
trunk/freenet/src/freenet/node/NodeStats.java
trunk/freenet/src/freenet/node/OutgoingPacketMangler.java
trunk/freenet/src/freenet/node/PeerNode.java
trunk/freenet/src/freenet/node/simulator/RealNodeRequestInsertTest.java
trunk/freenet/src/freenet/node/simulator/RealNodeRoutingTest.java
Log:
Refactor UdpSocketManager into MessageCore and UdpSocketHandler (one per
socket/port, we will have multiple sockets/ports with opennet).
Useful for opennet, also useful for alternate transports later on.
Untested.
Modified: trunk/freenet/src/freenet/clients/http/N2NTMToadlet.java
===================================================================
--- trunk/freenet/src/freenet/clients/http/N2NTMToadlet.java 2007-06-27
17:55:09 UTC (rev 13783)
+++ trunk/freenet/src/freenet/clients/http/N2NTMToadlet.java 2007-06-27
21:00:20 UTC (rev 13784)
@@ -10,7 +10,7 @@
import java.util.Iterator;
import freenet.client.HighLevelSimpleClient;
-import freenet.io.comm.UdpSocketManager;
+import freenet.io.comm.MessageCore;
import freenet.l10n.L10n;
import freenet.node.Node;
import freenet.node.NodeClientCore;
@@ -27,7 +27,7 @@
private NodeClientCore core;
- private UdpSocketManager usm;
+ private MessageCore usm;
protected N2NTMToadlet(Node n, NodeClientCore core,
HighLevelSimpleClient client) {
Copied: trunk/freenet/src/freenet/io/comm/MessageCore.java (from rev 13775,
trunk/freenet/src/freenet/io/comm/UdpSocketManager.java)
===================================================================
--- trunk/freenet/src/freenet/io/comm/MessageCore.java
(rev 0)
+++ trunk/freenet/src/freenet/io/comm/MessageCore.java 2007-06-27 21:00:20 UTC
(rev 13784)
@@ -0,0 +1,494 @@
+/*
+ * Dijjer - A Peer to Peer HTTP Cache
+ * Copyright (C) 2004,2005 Change.Tv, Inc
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+package freenet.io.comm;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Vector;
+
+import freenet.node.PeerNode;
+import freenet.node.Ticker;
+import freenet.support.Logger;
+import freenet.support.TimeUtil;
+
+public class MessageCore {
+
+ public static final String VERSION = "$Id: MessageCore.java,v 1.22
2005/08/25 17:28:19 amphibian Exp $";
+ private static boolean logMINOR;
+ private Dispatcher _dispatcher;
+ /** _filters serves as lock for both */
+ private final LinkedList _filters = new LinkedList();
+ private final LinkedList _unclaimed = new LinkedList();
+ private static final int MAX_UNMATCHED_FIFO_SIZE = 50000;
+ private static final long MAX_UNCLAIMED_FIFO_ITEM_LIFETIME =
60*60*1000; // 1 hour
+ // Every second, remove all timed out filters
+ private static final int FILTER_REMOVE_TIME = 1000;
+
+ public MessageCore() {
+ _timedOutFilters = new Vector(32);
+ logMINOR = Logger.shouldLog(Logger.MINOR, this);
+ }
+
+ /**
+ * Decode a packet from data and a peer.
+ * Can be called by IncomingPacketFilter's.
+ * @param data
+ * @param offset
+ * @param length
+ * @param peer
+ */
+ public Message decodeSingleMessage(byte[] data, int offset, int length,
PeerContext peer, int overhead) {
+ try {
+ return Message.decodeMessageFromPacket(data, offset, length, peer,
overhead);
+ } catch (Throwable t) {
+ Logger.error(this, "Could not decode packet: "+t, t);
+ return null;
+ }
+ }
+
+ /** Only used by removeTimedOutFilters() - if future code uses this
elsewhere, we need to
+ * reconsider its locking. */
+ private final Vector _timedOutFilters;
+
+ public void start(final Ticker ticker) {
+ ticker.queueTimedJob(new Runnable() {
+
+ public void run() {
+ try {
+ removeTimedOutFilters();
+ } catch (Throwable t) {
+ Logger.error(this, "Failed to remove
timed out filters: "+t, t);
+ } finally {
+ ticker.queueTimedJob(this,
FILTER_REMOVE_TIME);
+ }
+ // TODO Auto-generated method stub
+
+ }
+
+ }, FILTER_REMOVE_TIME);
+ }
+
+ /**
+ * Remove timed out filters.
+ */
+ void removeTimedOutFilters() {
+ long tStart = System.currentTimeMillis();
+ synchronized (_filters) {
+ for (ListIterator i = _filters.listIterator();
i.hasNext();) {
+ MessageFilter f = (MessageFilter) i.next();
+ if (f.timedOut()) {
+ i.remove();
+ _timedOutFilters.add(f);
+ } else { // Because _filters are in order of
timeout, we
+ // can abort the iteration as soon as
we find one that
+ // doesn't timeout
+ break;
+ }
+ }
+ }
+
+ for(int i=0;i<_timedOutFilters.size();i++) {
+ MessageFilter f = (MessageFilter)
_timedOutFilters.get(i);
+ f.setMessage(null);
+ f.onTimedOut();
+ }
+ _timedOutFilters.clear();
+
+ long tEnd = System.currentTimeMillis();
+ if(tEnd - tStart > 50) {
+ if(tEnd - tStart > 3000)
+ Logger.error(this, "removeTimedOutFilters took
"+(tEnd-tStart)+"ms");
+ else
+ if(logMINOR) Logger.minor(this,
"removeTimedOutFilters took "+(tEnd-tStart)+"ms");
+ }
+ }
+
+ /**
+ * Dispatch a message to a waiting filter, or feed it to the
+ * Dispatcher if none are found.
+ * @param m The Message to dispatch.
+ */
+ public void checkFilters(Message m, PacketSocketHandler from) {
+ long tStart = System.currentTimeMillis();
+ if(logMINOR) Logger.minor(this, "checkFilters: "+m+" from
"+m.getSource());
+ if ((m.getSource()) instanceof PeerNode)
+ {
+
((PeerNode)m.getSource()).addToLocalNodeReceivedMessagesFromStatistic(m);
+ }
+ boolean matched = false;
+ if ((!(m.getSpec().equals(DMT.packetTransmit))) && logMINOR) {
+ if ((m.getSpec().equals(DMT.ping) ||
m.getSpec().equals(DMT.pong)) && Logger.shouldLog(Logger.DEBUG, this)) {
+ Logger.debug(this, "" +
(System.currentTimeMillis() % 60000) + ' ' + from + " <- "
+ + m.getSource() + " : " + m);
+ } else {
+ if(logMINOR) Logger.minor(this, "" +
(System.currentTimeMillis() % 60000) + ' ' + from + " <- "
+ + m.getSource() + " : " + m);
+ }
+ }
+ MessageFilter match = null;
+ synchronized (_filters) {
+ for (ListIterator i = _filters.listIterator();
i.hasNext();) {
+ MessageFilter f = (MessageFilter) i.next();
+ if (f.match(m)) {
+ matched = true;
+ i.remove();
+ match = f;
+ if(logMINOR) Logger.minor(this,
"Matched: "+f);
+ break; // Only one match permitted per
message
+ }
+ }
+ }
+ if(match != null) {
+ match.setMessage(m);
+ match.onMatched();
+ }
+ // Feed unmatched messages to the dispatcher
+ if ((!matched) && (_dispatcher != null)) {
+ try {
+ if(logMINOR) Logger.minor(this, "Feeding to dispatcher:
"+m);
+ matched = _dispatcher.handleMessage(m);
+ } catch (Throwable t) {
+ Logger.error(this, "Dispatcher threw "+t, t);
+ }
+ }
+ // Keep the last few _unclaimed messages around in case the
intended receiver isn't receiving yet
+ if (!matched) {
+ if(logMINOR) Logger.minor(this, "Unclaimed: "+m);
+ /** Check filters and then add to _unmatched is ATOMIC
+ * It has to be atomic, because otherwise we can get a
+ * race condition that results in timeouts on MFs.
+ *
+ * Specifically:
+ * - Thread A receives packet
+ * - Thread A checks filters. It doesn't match any.
+ * - Thread A feeds to Dispatcher.
+ * - Thread B creates filter.
+ * - Thread B checks _unmatched.
+ * - Thread B adds filter.
+ * - Thread B sleeps.
+ * - Thread A returns from Dispatcher. Which didn't match.
+ * - Thread A adds to _unmatched.
+ *
+ * OOPS!
+ * The only way to fix this is to have checking the
+ * filters and unmatched be a single atomic operation.
+ * Another race is possible if we merely recheck the
+ * filters after we return from dispatcher, for example.
+ */
+ synchronized (_filters) {
+ if(logMINOR) Logger.minor(this, "Rechecking
filters and adding message");
+ for (ListIterator i = _filters.listIterator();
i.hasNext();) {
+ MessageFilter f = (MessageFilter)
i.next();
+ if (f.match(m)) {
+ matched = true;
+ match = f;
+ i.remove();
+ if(logMINOR) Logger.minor(this,
"Matched: "+f);
+ break; // Only one match
permitted per message
+ }
+ }
+ if(!matched) {
+ while (_unclaimed.size() >
MAX_UNMATCHED_FIFO_SIZE) {
+ Message removed =
(Message)_unclaimed.removeFirst();
+ long messageLifeTime =
System.currentTimeMillis() - removed.localInstantiationTime;
+ if ((removed.getSource()) instanceof
PeerNode) {
+ Logger.normal(this, "Dropping
unclaimed from "+removed.getSource().getPeer()+", lived
"+TimeUtil.formatTime(messageLifeTime, 2, true)+" (quantity)"+": "+removed);
+ } else {
+ Logger.normal(this, "Dropping
unclaimed, lived "+TimeUtil.formatTime(messageLifeTime, 2, true)+"
(quantity)"+": "+removed);
+ }
+ }
+ _unclaimed.addLast(m);
+ if(logMINOR) Logger.minor(this, "Done");
+ }
+ }
+ if(match != null) {
+ match.setMessage(m);
+ match.onMatched();
+ }
+ }
+ long tEnd = System.currentTimeMillis();
+ if(tEnd - tStart > 50) {
+ if(tEnd - tStart > 3000)
+ Logger.error(this, "checkFilters took
"+(tEnd-tStart)+"ms with unclaimedFIFOSize of "+_unclaimed.size()+" for
matched: "+matched);
+ else
+ if(logMINOR) Logger.minor(this, "checkFilters
took "+(tEnd-tStart)+"ms with unclaimedFIFOSize of "+_unclaimed.size()+" for
matched: "+matched);
+ }
+ }
+
+ /** IncomingPacketFilter should call this when a node is disconnected.
*/
+ public void onDisconnect(PeerContext ctx) {
+ Vector droppedFilters = null; // rare operation, we can waste
objects for better locking
+ synchronized(_filters) {
+ ListIterator i = _filters.listIterator();
+ while (i.hasNext()) {
+ MessageFilter f = (MessageFilter) i.next();
+ if(f.matchesDroppedConnection(ctx)) {
+ if(droppedFilters == null)
+ droppedFilters = new Vector();
+ droppedFilters.add(f);
+ i.remove();
+ }
+ }
+ }
+ if(droppedFilters != null) {
+ for(int i=0;i<droppedFilters.size();i++) {
+ MessageFilter mf = (MessageFilter)
droppedFilters.get(i);
+ mf.onDroppedConnection(ctx);
+ }
+ }
+ }
+
+ /** IncomingPacketFilter should call this when a node connects with a
new boot ID */
+ public void onRestart(PeerContext ctx) {
+ Vector droppedFilters = null; // rare operation, we can waste
objects for better locking
+ synchronized(_filters) {
+ ListIterator i = _filters.listIterator();
+ while (i.hasNext()) {
+ MessageFilter f = (MessageFilter) i.next();
+ if(f.matchesRestartedConnection(ctx)) {
+ if(droppedFilters == null)
+ droppedFilters = new Vector();
+ droppedFilters.add(f);
+ i.remove();
+ }
+ }
+ }
+ if(droppedFilters != null) {
+ for(int i=0;i<droppedFilters.size();i++) {
+ MessageFilter mf = (MessageFilter)
droppedFilters.get(i);
+ mf.onRestartedConnection(ctx);
+ }
+ }
+ }
+
+ public void addAsyncFilter(MessageFilter filter,
AsyncMessageFilterCallback callback) throws DisconnectedException {
+ filter.setAsyncCallback(callback);
+ filter.onStartWaiting();
+ boolean logDEBUG = Logger.shouldLog(Logger.DEBUG, this);
+ if(logDEBUG) Logger.debug(this, "Adding async filter "+filter+"
for "+callback);
+ Message ret = null;
+ if(filter._source != null && (!filter._source.isConnected()) &&
+ filter.matchesDroppedConnection(filter._source))
+ throw new DisconnectedException();
+ // Check to see whether the filter matches any of the recently
_unclaimed messages
+ // Drop any _unclaimed messages that the filter doesn't match
that are also older than MAX_UNCLAIMED_FIFO_ITEM_LIFETIME
+ long now = System.currentTimeMillis();
+ long messageDropTime = now - MAX_UNCLAIMED_FIFO_ITEM_LIFETIME;
+ long messageLifeTime = 0;
+ synchronized (_filters) {
+ if(logMINOR) Logger.minor(this, "Checking _unclaimed");
+ for (ListIterator i = _unclaimed.listIterator();
i.hasNext();) {
+ Message m = (Message) i.next();
+ if (filter.match(m)) {
+ i.remove();
+ ret = m;
+ if(logMINOR) Logger.debug(this,
"Matching from _unclaimed");
+ break;
+ } else if (m.localInstantiationTime <
messageDropTime) {
+ i.remove();
+ messageLifeTime = now -
m.localInstantiationTime;
+ if ((m.getSource()) instanceof
PeerNode) {
+ Logger.normal(this, "Dropping
unclaimed from "+m.getSource().getPeer()+", lived
"+TimeUtil.formatTime(messageLifeTime, 2, true)+" (age)"+": "+m);
+ } else {
+ Logger.normal(this, "Dropping
unclaimed, lived "+TimeUtil.formatTime(messageLifeTime, 2, true)+" (age)"+":
"+m);
+ }
+ }
+ }
+ if (ret == null) {
+ if(logMINOR) Logger.minor(this, "Not in
_unclaimed");
+ // Insert filter into filter list in order of
timeout
+ ListIterator i = _filters.listIterator();
+ while (true) {
+ if (!i.hasNext()) {
+ i.add(filter);
+ if(logMINOR) Logger.minor(this,
"Added at end");
+ break;
+ }
+ MessageFilter mf = (MessageFilter)
i.next();
+ if (mf.getTimeout() >
filter.getTimeout()) {
+ i.previous();
+ i.add(filter);
+ if(logMINOR) Logger.minor(this,
"Added in middle - mf timeout="+mf.getTimeout()+" - my
timeout="+filter.getTimeout());
+ break;
+ }
+ }
+ }
+ }
+ if(ret != null) {
+ filter.onMatched();
+ filter.clearMatched();
+ }
+ }
+
+ /**
+ * Wait for a filter to trigger, or timeout. Blocks until either the
trigger is activated, or it times
+ * out, or the peer is disconnected.
+ * @param filter The filter to wait for.
+ * @param ctr Byte counter to add bytes from the message to.
+ * @return Either a message, or null if the filter timed out.
+ * @throws DisconnectedException If the single peer being waited for
disconnects.
+ */
+ public Message waitFor(MessageFilter filter, ByteCounter ctr) throws
DisconnectedException {
+ boolean logDEBUG = Logger.shouldLog(Logger.DEBUG, this);
+ if(logDEBUG) Logger.debug(this, "Waiting for "+filter);
+ long startTime = System.currentTimeMillis();
+ filter.onStartWaiting();
+ Message ret = null;
+ if(filter._source != null && (!filter._source.isConnected()) &&
+ filter.matchesDroppedConnection(filter._source))
+ throw new DisconnectedException();
+ // Check to see whether the filter matches any of the recently
_unclaimed messages
+ // Drop any _unclaimed messages that the filter doesn't match
that are also older than MAX_UNCLAIMED_FIFO_ITEM_LIFETIME
+ long now = System.currentTimeMillis();
+ long messageDropTime = now - MAX_UNCLAIMED_FIFO_ITEM_LIFETIME;
+ long messageLifeTime = 0;
+ synchronized (_filters) {
+ if(logMINOR) Logger.minor(this, "Checking _unclaimed");
+ for (ListIterator i = _unclaimed.listIterator();
i.hasNext();) {
+ Message m = (Message) i.next();
+ if (filter.match(m)) {
+ i.remove();
+ ret = m;
+ if(logMINOR) Logger.debug(this,
"Matching from _unclaimed");
+ break;
+ } else if (m.localInstantiationTime <
messageDropTime) {
+ i.remove();
+ messageLifeTime = now -
m.localInstantiationTime;
+ if ((m.getSource()) instanceof
PeerNode) {
+ Logger.normal(this, "Dropping
unclaimed from "+m.getSource().getPeer()+", lived
"+TimeUtil.formatTime(messageLifeTime, 2, true)+" (age)"+": "+m);
+ } else {
+ Logger.normal(this, "Dropping
unclaimed, lived "+TimeUtil.formatTime(messageLifeTime, 2, true)+" (age)"+":
"+m);
+ }
+ }
+ }
+ if (ret == null) {
+ if(logMINOR) Logger.minor(this, "Not in
_unclaimed");
+ // Insert filter into filter list in order of
timeout
+ ListIterator i = _filters.listIterator();
+ while (true) {
+ if (!i.hasNext()) {
+ i.add(filter);
+ if(logMINOR) Logger.minor(this,
"Added at end");
+ break;
+ }
+ MessageFilter mf = (MessageFilter)
i.next();
+ if (mf.getTimeout() >
filter.getTimeout()) {
+ i.previous();
+ i.add(filter);
+ if(logMINOR) Logger.minor(this,
"Added in middle - mf timeout="+mf.getTimeout()+" - my
timeout="+filter.getTimeout());
+ break;
+ }
+ }
+ }
+ }
+ long tEnd = System.currentTimeMillis();
+ if(tEnd - now > 50) {
+ if(tEnd - now > 3000)
+ Logger.error(this, "waitFor _unclaimed
iteration took "+(tEnd-now)+"ms with unclaimedFIFOSize of "+_unclaimed.size()+"
for ret of "+ret);
+ else
+ if(logMINOR) Logger.minor(this, "waitFor
_unclaimed iteration took "+(tEnd-now)+"ms with unclaimedFIFOSize of
"+_unclaimed.size()+" for ret of "+ret);
+ }
+ // Unlock to wait on filter
+ // Waiting on the filter won't release the outer lock
+ // So we have to release it here
+ if(ret == null) {
+ if(logMINOR) Logger.minor(this, "Waiting...");
+ synchronized (filter) {
+ try {
+ // Precaution against filter getting
matched between being added to _filters and
+ // here - bug discovered by Mason
+ boolean fmatched = false;
+ while(!(fmatched = (filter.matched() ||
(filter.droppedConnection() != null)))) {
+ long wait =
filter.getTimeout()-System.currentTimeMillis();
+ if(wait > 0)
+ filter.wait(wait);
+ else break;
+ }
+ if(filter.droppedConnection() != null)
+ throw new DisconnectedException();
+ if(logMINOR) Logger.minor(this, "Matched:
"+fmatched);
+ } catch (InterruptedException e) {
+ }
+ ret = filter.getMessage();
+ filter.clearMatched();
+ }
+ if(logDEBUG) Logger.debug(this, "Returning "+ret+" from
"+filter);
+ } else {
+ // Matched an unclaimed packet
+ filter.onMatched();
+ filter.clearMatched();
+ }
+ // Probably get rid...
+// if (Dijjer.getDijjer().getDumpMessageWaitTimes() != null) {
+//
Dijjer.getDijjer().getDumpMessageWaitTimes().println(filter.toString() + "\t" +
filter.getInitialTimeout() + "\t"
+// + (System.currentTimeMillis() -
startTime));
+// Dijjer.getDijjer().getDumpMessageWaitTimes().flush();
+// }
+ long endTime = System.currentTimeMillis();
+ if(logDEBUG) Logger.debug(this, "Returning in
"+(endTime-startTime)+"ms");
+ if((ctr != null) && (ret != null))
+ ctr.receivedBytes(ret._receivedByteCount);
+ return ret;
+ }
+
+ /**
+ * Send a Message to a PeerContext.
+ * @throws NotConnectedException If we are not currently connected to
the node.
+ */
+ public void send(PeerContext destination, Message m, ByteCounter ctr)
throws NotConnectedException {
+ if(m.getSpec().isInternalOnly()) {
+ Logger.error(this, "Trying to send internal-only message "+m+"
of spec "+m.getSpec(), new Exception("debug"));
+ return;
+ }
+ destination.sendAsync(m, null, 0, ctr);
+ }
+
+ public void setDispatcher(Dispatcher d) {
+ _dispatcher = d;
+ }
+
+ /**
+ * @return the number of received messages that are currently unclaimed
+ */
+ public int getUnclaimedFIFOSize() {
+ synchronized (_filters){
+ return _unclaimed.size();
+ }
+ }
+
+ public Map getUnclaimedFIFOMessageCounts() {
+ Map messageCounts = new HashMap();
+ synchronized(_filters) {
+ for (ListIterator i = _unclaimed.listIterator();
i.hasNext();) {
+ Message m = (Message) i.next();
+ String messageName = m.getSpec().getName();
+ Integer messageCount = (Integer)
messageCounts.get(messageName);
+ if (messageCount == null) {
+ messageCounts.put(messageName, new
Integer(1) );
+ } else {
+ messageCount = new
Integer(messageCount.intValue() + 1);
+ messageCounts.put(messageName,
messageCount );
+ }
+ }
+ }
+ return messageCounts;
+ }
+}
Modified: trunk/freenet/src/freenet/io/comm/MessageFilter.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/MessageFilter.java 2007-06-27
17:55:09 UTC (rev 13783)
+++ trunk/freenet/src/freenet/io/comm/MessageFilter.java 2007-06-27
21:00:20 UTC (rev 13784)
@@ -166,6 +166,7 @@
}
public boolean match(Message m) {
+ if(timedOut()) return false;
if ((_or != null) && (_or.match(m))) {
_matched = true;
return true;
Added: trunk/freenet/src/freenet/io/comm/PacketSocketHandler.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/PacketSocketHandler.java
(rev 0)
+++ trunk/freenet/src/freenet/io/comm/PacketSocketHandler.java 2007-06-27
21:00:20 UTC (rev 13784)
@@ -0,0 +1,32 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.io.comm;
+
+import freenet.io.comm.Peer.LocalAddressException;
+
+/**
+ * Base class for UdpSocketHandler and any other datagram-based transports.
+ */
+public interface PacketSocketHandler extends SocketHandler {
+
+ /** The maximum size of a packet, not including transport layer headers
*/
+ int getMaxPacketSize();
+
+ /**
+ * Send a block of encoded bytes to a peer. This is called by
+ * send, and by IncomingPacketFilter.processOutgoing(..).
+ * @param blockToSend The data block to send.
+ * @param destination The peer to send it to.
+ */
+ public void sendPacket(byte[] blockToSend, Peer destination, boolean
allowLocalAddresses) throws LocalAddressException;
+
+ /**
+ * Get the size of the transport layer headers, for byte accounting
purposes.
+ */
+ int getHeadersLength();
+
+ /** Set the decryption filter to which incoming packets will be fed */
+ public void setLowLevelFilter(IncomingPacketFilter f);
+
+}
Modified: trunk/freenet/src/freenet/io/comm/PeerContext.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/PeerContext.java 2007-06-27 17:55:09 UTC
(rev 13783)
+++ trunk/freenet/src/freenet/io/comm/PeerContext.java 2007-06-27 21:00:20 UTC
(rev 13784)
@@ -4,6 +4,7 @@
package freenet.io.comm;
import freenet.io.xfer.PacketThrottle;
+import freenet.node.OutgoingPacketMangler;
/**
* @author amphibian
@@ -36,4 +37,10 @@
/** Get the PacketThrottle for the node's current address for the
standard packet size (if the
* address changes then we get a new throttle). */
public PacketThrottle getThrottle();
+
+ /** Get the SocketHandler which handles incoming packets from this node
*/
+ SocketHandler getSocketHandler();
+
+ /** Get the OutgoingPacketMangler which encrypts outgoing packets to
this node */
+ OutgoingPacketMangler getOutgoingMangler();
}
Added: trunk/freenet/src/freenet/io/comm/SocketHandler.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/SocketHandler.java
(rev 0)
+++ trunk/freenet/src/freenet/io/comm/SocketHandler.java 2007-06-27
21:00:20 UTC (rev 13784)
@@ -0,0 +1,14 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.io.comm;
+
+/**
+ * Base class for all transports. We have a single object of this type for
both incoming and
+ * outgoing packets, but multiple instances for different instances of the
transport e.g. on
+ * different ports, with different crypto backends etc.
+ * @author toad
+ */
+public interface SocketHandler {
+
+}
Added: trunk/freenet/src/freenet/io/comm/UdpSocketHandler.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/UdpSocketHandler.java
(rev 0)
+++ trunk/freenet/src/freenet/io/comm/UdpSocketHandler.java 2007-06-27
21:00:20 UTC (rev 13784)
@@ -0,0 +1,367 @@
+package freenet.io.comm;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.util.Random;
+
+import org.tanukisoftware.wrapper.WrapperManager;
+
+import freenet.io.comm.Peer.LocalAddressException;
+import freenet.node.LoggingConfigHandler;
+import freenet.node.Node;
+import freenet.support.FileLoggerHook;
+import freenet.support.Logger;
+import freenet.support.OOMHandler;
+
+public class UdpSocketHandler extends Thread implements PacketSocketHandler {
+
+ private final DatagramSocket _sock;
+ private final InetAddress _bindTo;
+ private IncomingPacketFilter lowLevelFilter;
+ /** RNG for debugging, used with _dropProbability.
+ * NOT CRYPTO SAFE. DO NOT USE FOR THINGS THAT NEED CRYPTO SAFE RNG!
+ */
+ private Random dropRandom;
+ /** If >0, 1 in _dropProbability chance of dropping a packet; for
debugging */
+ private int _dropProbability;
+ // Icky layer violation, but we need to know the Node to work around
the EvilJVMBug.
+ private final Node node;
+ private static boolean logMINOR;
+ private volatile int lastTimeInSeconds;
+ private boolean _isDone;
+
+ public UdpSocketHandler(int listenPort, InetAddress bindto, Node node)
throws SocketException {
+ super("MessageCore packet receiver thread on port " +
listenPort);
+ this.node = node;
+ _bindTo = bindto;
+ // Keep the Updater code in, just commented out, for now
+ // We may want to be able to do on-line updates.
+// if (Updater.hasResource()) {
+// _sock = (DatagramSocket) Updater.getResource();
+// } else {
+ _sock = new DatagramSocket(listenPort, bindto);
+ int sz = _sock.getReceiveBufferSize();
+ if(sz < 32768)
+ _sock.setReceiveBufferSize(32768);
+ try {
+ // We make it timeout every 100ms so that we can check
for
+ // _filters which have timed out, this
+ // is ugly but our only option without resorting to
java.nio
+ // because there is no way to forcefully
+ // interrupt a socket wait operation
+ _sock.setSoTimeout(100);
+ } catch (SocketException e) {
+ throw new RuntimeException(e);
+ }
+// }
+ // Only used for debugging, no need to seed from Yarrow
+ dropRandom = new Random();
+ logMINOR = Logger.shouldLog(Logger.MINOR, this);
+ }
+
+ /** Must be called, or we will NPE in run() */
+ public void setLowLevelFilter(IncomingPacketFilter f) {
+ lowLevelFilter = f;
+ }
+
+ public InetAddress getBindTo() {
+ return _bindTo;
+ }
+
+ public void run() { // Listen for packets
+ try {
+ runLoop();
+ } catch (Throwable t) {
+ // Impossible? It keeps on exiting. We get the below,
+ // but not this...
+ try {
+ System.err.print(t.getClass().getName());
+ System.err.println();
+ } catch (Throwable tt) {};
+ try {
+ System.err.print(t.getMessage());
+ System.err.println();
+ } catch (Throwable tt) {};
+ try {
+ System.gc();
+ System.runFinalization();
+ System.gc();
+ System.runFinalization();
+ } catch (Throwable tt) {}
+ try {
+ Runtime r = Runtime.getRuntime();
+ System.err.print(r.freeMemory());
+ System.err.println();
+ System.err.print(r.totalMemory());
+ System.err.println();
+ } catch (Throwable tt) {};
+ try {
+ t.printStackTrace();
+ } catch (Throwable tt) {};
+ } finally {
+ System.err.println("run() exiting");
+ Logger.error(this, "run() exiting");
+ synchronized (this) {
+ _isDone = true;
+ notifyAll();
+ }
+ }
+ }
+
+ private void runLoop() {
+ byte[] buf = new byte[MAX_RECEIVE_SIZE];
+ DatagramPacket packet = new DatagramPacket(buf, buf.length);
+ while (/*_active*/true) {
+ try {
+ lastTimeInSeconds = (int)
(System.currentTimeMillis() / 1000);
+ realRun(packet);
+ } catch (OutOfMemoryError e) {
+ OOMHandler.handleOOM(e);
+ System.err.println("Will retry above failed
operation...");
+ } catch (Throwable t) {
+ System.err.println("Caught "+t);
+ t.printStackTrace(System.err);
+ Logger.error(this, "Caught " + t, t);
+ }
+ }
+ }
+
+ private void realRun(DatagramPacket packet) {
+ // Single receiving thread
+ boolean gotPacket = getPacket(packet);
+ if (gotPacket) {
+ long startTime = System.currentTimeMillis();
+ Peer peer = new Peer(packet.getAddress(),
packet.getPort());
+ long endTime = System.currentTimeMillis();
+ if(endTime - startTime > 50) {
+ if(endTime-startTime > 3000)
+ Logger.error(this, "packet creation
took "+(endTime-startTime)+"ms");
+ else
+ if(logMINOR) Logger.minor(this, "packet
creation took "+(endTime-startTime)+"ms");
+ }
+ byte[] data = packet.getData();
+ int offset = packet.getOffset();
+ int length = packet.getLength();
+ try {
+ if(logMINOR) Logger.minor(this, "Processing
packet of length "+length+" from "+peer);
+ startTime = System.currentTimeMillis();
+ lowLevelFilter.process(data, offset, length,
peer);
+ endTime = System.currentTimeMillis();
+ if(endTime - startTime > 50) {
+ if(endTime-startTime > 3000)
+ Logger.error(this, "processing
packet took "+(endTime-startTime)+"ms");
+ else
+ if(logMINOR) Logger.minor(this,
"processing packet took "+(endTime-startTime)+"ms");
+ }
+ if(logMINOR) Logger.minor(this,
+ "Successfully handled packet
length " + length);
+ } catch (Throwable t) {
+ Logger.error(this, "Caught " + t + " from "
+ + lowLevelFilter, t);
+ }
+ } else if(logMINOR) Logger.minor(this, "Null packet");
+ }
+
+ // FIXME necessary to deal with bugs around build 1000; arguably necessary
to deal with large node names in connection setup
+ // Revert to 1500?
+ private static final int MAX_RECEIVE_SIZE = 2048;
+
+ private boolean getPacket(DatagramPacket packet) {
+ try {
+ _sock.receive(packet);
+ // TODO: keep?
+ IOStatisticCollector.addInfo(packet.getAddress() + ":"
+ packet.getPort(),
+ packet.getLength(), 0);
+ } catch (SocketTimeoutException e1) {
+ return false;
+ } catch (IOException e2) {
+ throw new RuntimeException(e2);
+ }
+ if(logMINOR) Logger.minor(this, "Received packet");
+ return true;
+ }
+
+ /**
+ * Send a block of encoded bytes to a peer. This is called by
+ * send, and by IncomingPacketFilter.processOutgoing(..).
+ * @param blockToSend The data block to send.
+ * @param destination The peer to send it to.
+ */
+ public void sendPacket(byte[] blockToSend, Peer destination, boolean
allowLocalAddresses) throws LocalAddressException {
+ assert(blockToSend != null);
+ // there should be no DNS needed here, but go ahead if we can,
but complain doing it
+ if( destination.getAddress(false, allowLocalAddresses) == null
) {
+ Logger.error(this, "Tried sending to destination
without pre-looked up IP address(needs a real Peer.getHostname()): null:" +
destination.getPort(), new Exception("error"));
+ if( destination.getAddress(true, allowLocalAddresses)
== null ) {
+ Logger.error(this, "Tried sending to bad
destination address: null:" + destination.getPort(), new Exception("error"));
+ return;
+ }
+ }
+ if (_dropProbability > 0) {
+ if (dropRandom.nextInt() % _dropProbability == 0) {
+ if(logMINOR) Logger.minor(this, "DROPPED: " +
_sock.getLocalPort() + " -> " + destination.getPort());
+ return;
+ }
+ }
+ InetAddress address = destination.getAddress(false,
allowLocalAddresses);
+ assert(address != null);
+ int port = destination.getPort();
+ DatagramPacket packet = new DatagramPacket(blockToSend,
blockToSend.length);
+ packet.setAddress(address);
+ packet.setPort(port);
+
+ // TODO: keep?
+ // packet.length() is simply the size of the buffer, it knows
nothing of UDP headers
+ IOStatisticCollector.addInfo(address + ":" + port, 0,
blockToSend.length + UDP_HEADERS_LENGTH);
+
+ try {
+ _sock.send(packet);
+ } catch (IOException e) {
+ if(packet.getAddress() instanceof Inet6Address)
+ Logger.normal(this, "Error while sending packet
to IPv6 address: "+destination+": "+e, e);
+ else
+ Logger.error(this, "Error while sending packet
to " + destination+": "+e, e);
+ }
+ }
+
+ // CompuServe use 1400 MTU; AOL claim 1450; DFN at home use 1448.
+ // http://info.aol.co.uk/broadband/faqHomeNetworking.adp
+ //
http://www.compuserve.de/cso/hilfe/linux/hilfekategorien/installation/contentview.jsp?conid=385700
+ // http://www.studenten-ins-netz.net/inhalt/service_faq.html
+ // officially GRE is 1476 and PPPoE is 1492.
+ // unofficially, PPPoE is often 1472 (seen in the wild). Also PPPoATM
is sometimes 1472.
+ static final int MAX_ALLOWED_MTU = 1400;
+ // FIXME this is different for IPv6 (check all uses of constant when
fixing)
+ public static final int UDP_HEADERS_LENGTH = 28;
+
+ /**
+ * @return The maximum packet size supported by this SocketManager, not
including transport (UDP/IP) headers.
+ */
+ public int getMaxPacketSize() { //FIXME: what about passing a peerNode
though and doing it on a per-peer basis?
+ final int minAdvertisedMTU = node.ipDetector.getMinimumDetectedMTU();
+
+ // We don't want the MTU detection thingy to prevent us to send
PacketTransmits!
+ if(minAdvertisedMTU < 1100){
+ Logger.error(this, "It shouldn't happen : we disabled the MTU
detection algorithm because the advertised MTU is smallish !!
("+node.ipDetector.getMinimumDetectedMTU()+')');
+ return MAX_ALLOWED_MTU - UDP_HEADERS_LENGTH;
+ } else
+ return (minAdvertisedMTU < MAX_ALLOWED_MTU ? minAdvertisedMTU :
MAX_ALLOWED_MTU) - UDP_HEADERS_LENGTH;
+ // UDP/IP header is 28 bytes.
+ }
+
+ public void start() {
+ start(false);
+ }
+
+ public void start(boolean disableHangChecker) {
+ lastTimeInSeconds = (int) (System.currentTimeMillis() / 1000);
+ setDaemon(true);
+ setPriority(Thread.MAX_PRIORITY);
+ super.start();
+ if(!disableHangChecker) {
+ Thread checker = new Thread(new USMChecker(),
"MessageCore$USMChecker");
+ checker.setDaemon(true);
+ checker.setPriority(Thread.MAX_PRIORITY);
+ checker.start();
+ }
+ }
+
+ public class USMChecker implements Runnable {
+ public void run() {
+ while(true) {
+ logMINOR = Logger.shouldLog(Logger.MINOR,
UdpSocketHandler.this);
+ try {
+ Thread.sleep(10*1000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ if(UdpSocketHandler.this.isAlive()) {
+ if(logMINOR) Logger.minor(this, "PING
on "+UdpSocketHandler.this);
+ long time = System.currentTimeMillis();
+ int timeSecs = (int) (time / 1000);
+ if(timeSecs - lastTimeInSeconds > 3*60)
{
+
+ // USM has hung.
+ // Probably caused by the
EvilJVMBug (see PacketSender).
+ // We'd better restart... :(
+
+ LoggingConfigHandler lch =
Node.logConfigHandler;
+ FileLoggerHook flh = lch ==
null ? null : lch.getFileLoggerHook();
+ boolean hasRedirected = flh ==
null ? false : flh.hasRedirectedStdOutErrNoLock();
+
+ if(!hasRedirected)
+
System.err.println("Restarting node: MessageCore froze for 3 minutes!");
+
+ try {
+
if(node.isUsingWrapper()){
+
WrapperManager.requestThreadDump();
+
WrapperManager.restart();
+ }else{
+
if(!hasRedirected)
+
System.err.println("Exiting on deadlock, but not running in the wrapper! Please
restart the node manually.");
+
+ // No wrapper :
we don't want to let it harm the network!
+ node.exit("USM
deadlock");
+ }
+ } catch (Throwable t) {
+ if(!hasRedirected) {
+
System.err.println("Error : can't restart the node : consider installing the
wrapper. PLEASE REPORT THAT ERROR TO devl at freenetproject.org");
+
t.printStackTrace();
+ }
+ node.exit("USM deadlock
and error");
+ }
+ }
+ } else {
+ Logger.error(this, "MAIN LOOP
TERMINATED");
+ System.err.println("MAIN LOOP
TERMINATED!");
+ node.exit(Node.EXIT_MAIN_LOOP_LOST);
+ }
+ }
+ }
+ }
+
+ public void close(boolean exit) {
+ Logger.error(this, "Closing.", new Exception("error"));
+ synchronized (this) {
+ while (!_isDone) {
+ try {
+ wait(2000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ if (exit) {
+ _sock.close();
+ } else {
+ Logger.fatal(this, 10, "Not implemented: close(false)");
+ //Updater.saveResource(_sock);
+ }
+ }
+
+ public int getDropProbability() {
+ return _dropProbability;
+ }
+
+ public void setDropProbability(int dropProbability) {
+ _dropProbability = dropProbability;
+ }
+
+ public int getPortNumber() {
+ return _sock.getLocalPort();
+ }
+
+ public String toString() {
+ return _sock.getLocalAddress() + ":" + _sock.getLocalPort();
+ }
+
+ public int getHeadersLength() {
+ return UDP_HEADERS_LENGTH;
+ }
+
+}
Deleted: trunk/freenet/src/freenet/io/comm/UdpSocketManager.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/UdpSocketManager.java 2007-06-27
17:55:09 UTC (rev 13783)
+++ trunk/freenet/src/freenet/io/comm/UdpSocketManager.java 2007-06-27
21:00:20 UTC (rev 13784)
@@ -1,853 +0,0 @@
-/*
- * Dijjer - A Peer to Peer HTTP Cache
- * Copyright (C) 2004,2005 Change.Tv, Inc
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- */
-package freenet.io.comm;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-
-import org.tanukisoftware.wrapper.WrapperManager;
-
-import freenet.io.comm.Peer.LocalAddressException;
-import freenet.node.LoggingConfigHandler;
-import freenet.node.Node;
-import freenet.node.PeerNode;
-import freenet.support.FileLoggerHook;
-import freenet.support.Logger;
-import freenet.support.OOMHandler;
-import freenet.support.TimeUtil;
-
-public class UdpSocketManager extends Thread {
-
- public static final String VERSION = "$Id: UdpSocketManager.java,v 1.22
2005/08/25 17:28:19 amphibian Exp $";
- private static boolean logMINOR;
- private Dispatcher _dispatcher;
- private final DatagramSocket _sock;
- /** _filters serves as lock for both */
- private final LinkedList _filters = new LinkedList();
- private final LinkedList _unclaimed = new LinkedList();
- private int _dropProbability;
- private IncomingPacketFilter lowLevelFilter;
- /** RNG for debugging, used with _dropProbability.
- * NOT CRYPTO SAFE. DO NOT USE FOR THINGS THAT NEED CRYPTO SAFE RNG!
- */
- private Random dropRandom;
- private boolean _isDone;
- private static UdpSocketManager _usm;
- private static final int MAX_UNMATCHED_FIFO_SIZE = 50000;
- private static final long MAX_UNCLAIMED_FIFO_ITEM_LIFETIME =
60*60*1000; // 1 hour
- private volatile int lastTimeInSeconds;
- private final InetAddress _bindTo;
-
- // Icky layer violation, but we need to know the Node to work around
the EvilJVMBug.
- private final Node node;
-
- public void start() {
- start(false);
- }
-
- public void start(boolean disableHangChecker) {
- lastTimeInSeconds = (int) (System.currentTimeMillis() / 1000);
- setDaemon(true);
- setPriority(Thread.MAX_PRIORITY);
- super.start();
- if(!disableHangChecker) {
- Thread checker = new Thread(new USMChecker(),
"UdpSocketManager$USMChecker");
- checker.setDaemon(true);
- checker.setPriority(Thread.MAX_PRIORITY);
- checker.start();
- }
- }
-
- public class USMChecker implements Runnable {
- public void run() {
- while(true) {
- logMINOR = Logger.shouldLog(Logger.MINOR,
UdpSocketManager.this);
- try {
- Thread.sleep(10*1000);
- } catch (InterruptedException e) {
- // Ignore
- }
- if(UdpSocketManager.this.isAlive()) {
- if(logMINOR) Logger.minor(this, "PING
on "+UdpSocketManager.this);
- long time = System.currentTimeMillis();
- int timeSecs = (int) (time / 1000);
- if(timeSecs - lastTimeInSeconds > 3*60)
{
-
- // USM has hung.
- // Probably caused by the
EvilJVMBug (see PacketSender).
- // We'd better restart... :(
-
- LoggingConfigHandler lch =
Node.logConfigHandler;
- FileLoggerHook flh = lch ==
null ? null : lch.getFileLoggerHook();
- boolean hasRedirected = flh ==
null ? false : flh.hasRedirectedStdOutErrNoLock();
-
- if(!hasRedirected)
-
System.err.println("Restarting node: UdpSocketManager froze for 3 minutes!");
-
- try {
-
if(node.isUsingWrapper()){
-
WrapperManager.requestThreadDump();
-
WrapperManager.restart();
- }else{
-
if(!hasRedirected)
-
System.err.println("Exiting on deadlock, but not running in the wrapper! Please
restart the node manually.");
-
- // No wrapper :
we don't want to let it harm the network!
- node.exit("USM
deadlock");
- }
- } catch (Throwable t) {
- if(!hasRedirected) {
-
System.err.println("Error : can't restart the node : consider installing the
wrapper. PLEASE REPORT THAT ERROR TO devl at freenetproject.org");
-
t.printStackTrace();
- }
- node.exit("USM deadlock
and error");
- }
- }
- } else {
- Logger.error(this, "MAIN LOOP
TERMINATED");
- System.err.println("MAIN LOOP
TERMINATED!");
- node.exit(Node.EXIT_MAIN_LOOP_LOST);
- }
- }
- }
- }
-
- public UdpSocketManager(int listenPort, InetAddress bindto, Node node)
throws SocketException {
- super("UdpSocketManager packet receiver thread on port " +
listenPort);
- this.node = node;
- _bindTo = bindto;
- // Keep the Updater code in, just commented out, for now
- // We may want to be able to do on-line updates.
-// if (Updater.hasResource()) {
-// _sock = (DatagramSocket) Updater.getResource();
-// } else {
- _sock = new DatagramSocket(listenPort, bindto);
- int sz = _sock.getReceiveBufferSize();
- if(sz < 32768)
- _sock.setReceiveBufferSize(32768);
- try {
- // We make it timeout every 100ms so that we can check
for
- // _filters which have timed out, this
- // is ugly but our only option without resorting to
java.nio
- // because there is no way to forcefully
- // interrupt a socket wait operation
- _sock.setSoTimeout(100);
- } catch (SocketException e) {
- throw new RuntimeException(e);
- }
-// }
- // Only used for debugging, no need to seed from Yarrow
- dropRandom = new Random();
- _timedOutFilters = new Vector(32);
- logMINOR = Logger.shouldLog(Logger.MINOR, this);
- }
-
- public InetAddress getBindTo() {
- return _bindTo;
- }
-
- public void run() { // Listen for packets
- try {
- runLoop();
- } catch (Throwable t) {
- // Impossible? It keeps on exiting. We get the below,
- // but not this...
- try {
- System.err.print(t.getClass().getName());
- System.err.println();
- } catch (Throwable tt) {};
- try {
- System.err.print(t.getMessage());
- System.err.println();
- } catch (Throwable tt) {};
- try {
- System.gc();
- System.runFinalization();
- System.gc();
- System.runFinalization();
- } catch (Throwable tt) {}
- try {
- Runtime r = Runtime.getRuntime();
- System.err.print(r.freeMemory());
- System.err.println();
- System.err.print(r.totalMemory());
- System.err.println();
- } catch (Throwable tt) {};
- try {
- t.printStackTrace();
- } catch (Throwable tt) {};
- } finally {
- System.err.println("run() exiting");
- Logger.error(this, "run() exiting");
- synchronized (this) {
- _isDone = true;
- notifyAll();
- }
- }
- }
-
- private void runLoop() {
- byte[] buf = new byte[MAX_RECEIVE_SIZE];
- DatagramPacket packet = new DatagramPacket(buf, buf.length);
- while (/*_active*/true) {
- try {
- lastTimeInSeconds = (int)
(System.currentTimeMillis() / 1000);
- realRun(packet);
- } catch (OutOfMemoryError e) {
- OOMHandler.handleOOM(e);
- System.err.println("Will retry above failed
operation...");
- } catch (Throwable t) {
- System.err.println("Caught "+t);
- t.printStackTrace(System.err);
- Logger.error(this, "Caught " + t, t);
- }
- }
- }
-
- private void realRun(DatagramPacket packet) {
- // Single receiving thread
- boolean gotPacket = getPacket(packet);
- // Check for timedout _filters
- removeTimedOutFilters();
- if (gotPacket) {
- long startTime = System.currentTimeMillis();
- Peer peer = new Peer(packet.getAddress(),
packet.getPort());
- long endTime = System.currentTimeMillis();
- if(endTime - startTime > 50) {
- if(endTime-startTime > 3000)
- Logger.error(this, "packet creation
took "+(endTime-startTime)+"ms");
- else
- if(logMINOR) Logger.minor(this, "packet
creation took "+(endTime-startTime)+"ms");
- }
- byte[] data = packet.getData();
- int offset = packet.getOffset();
- int length = packet.getLength();
- try {
- if(logMINOR) Logger.minor(this, "Processing
packet of length "+length+" from "+peer);
- startTime = System.currentTimeMillis();
- lowLevelFilter.process(data, offset, length,
peer);
- endTime = System.currentTimeMillis();
- if(endTime - startTime > 50) {
- if(endTime-startTime > 3000)
- Logger.error(this, "processing
packet took "+(endTime-startTime)+"ms");
- else
- if(logMINOR) Logger.minor(this,
"processing packet took "+(endTime-startTime)+"ms");
- }
- if(logMINOR) Logger.minor(this,
- "Successfully handled packet
length " + length);
- } catch (Throwable t) {
- Logger.error(this, "Caught " + t + " from "
- + lowLevelFilter, t);
- }
- } else if(logMINOR) Logger.minor(this, "Null packet");
- }
-
- /**
- * Decode a packet from data and a peer.
- * Can be called by IncomingPacketFilter's.
- * @param data
- * @param offset
- * @param length
- * @param peer
- */
- public Message decodeSingleMessage(byte[] data, int offset, int length,
PeerContext peer, int overhead) {
- try {
- return Message.decodeMessageFromPacket(data, offset, length, peer,
overhead);
- } catch (Throwable t) {
- Logger.error(this, "Could not decode packet: "+t, t);
- return null;
- }
- }
-
- // FIXME necessary to deal with bugs around build 1000; arguably necessary
to deal with large node names in connection setup
- // Revert to 1500?
- private static final int MAX_RECEIVE_SIZE = 2048;
-
- private boolean getPacket(DatagramPacket packet) {
- try {
- _sock.receive(packet);
- // TODO: keep?
- IOStatisticCollector.addInfo(packet.getAddress() + ":"
+ packet.getPort(),
- packet.getLength(), 0);
- } catch (SocketTimeoutException e1) {
- return false;
- } catch (IOException e2) {
- throw new RuntimeException(e2);
- }
- if(logMINOR) Logger.minor(this, "Received packet");
- return true;
- }
-
- /** Only used by removeTimedOutFilters() - if future code uses this
elsewhere, we need to
- * reconsider its locking. */
- private final Vector _timedOutFilters;
-
- /**
- * Remove timed out filters.
- * Only called by realRun(), so it can move timed out filters to the
_timedOutFilters array,
- * and then tell them that they are timed out without holding locks.
- *
- */
- private void removeTimedOutFilters() {
- long tStart = System.currentTimeMillis();
- synchronized (_filters) {
- for (ListIterator i = _filters.listIterator();
i.hasNext();) {
- MessageFilter f = (MessageFilter) i.next();
- if (f.timedOut()) {
- i.remove();
- _timedOutFilters.add(f);
- } else { // Because _filters are in order of
timeout, we
- // can abort the iteration as soon as
we find one that
- // doesn't timeout
- break;
- }
- }
- }
-
- for(int i=0;i<_timedOutFilters.size();i++) {
- MessageFilter f = (MessageFilter)
_timedOutFilters.get(i);
- f.setMessage(null);
- f.onTimedOut();
- }
- _timedOutFilters.clear();
-
- long tEnd = System.currentTimeMillis();
- if(tEnd - tStart > 50) {
- if(tEnd - tStart > 3000)
- Logger.error(this, "removeTimedOutFilters took
"+(tEnd-tStart)+"ms");
- else
- if(logMINOR) Logger.minor(this,
"removeTimedOutFilters took "+(tEnd-tStart)+"ms");
- }
- }
-
- /**
- * Dispatch a message to a waiting filter, or feed it to the
- * Dispatcher if none are found.
- * @param m The Message to dispatch.
- */
- public void checkFilters(Message m) {
- long tStart = System.currentTimeMillis();
- if(logMINOR) Logger.minor(this, "checkFilters: "+m+" from
"+m.getSource());
- if ((m.getSource()) instanceof PeerNode)
- {
-
((PeerNode)m.getSource()).addToLocalNodeReceivedMessagesFromStatistic(m);
- }
- boolean matched = false;
- if ((!(m.getSpec().equals(DMT.packetTransmit))) && logMINOR) {
- if ((m.getSpec().equals(DMT.ping) ||
m.getSpec().equals(DMT.pong)) && Logger.shouldLog(Logger.DEBUG, this)) {
- Logger.debug(this, "" +
(System.currentTimeMillis() % 60000) + ' ' + _sock.getLocalPort() + " <- "
- + m.getSource() + " : " + m);
- } else {
- if(logMINOR) Logger.minor(this, "" +
(System.currentTimeMillis() % 60000) + ' ' + _sock.getLocalPort() + " <- "
- + m.getSource() + " : " + m);
- }
- }
- MessageFilter match = null;
- synchronized (_filters) {
- for (ListIterator i = _filters.listIterator();
i.hasNext();) {
- MessageFilter f = (MessageFilter) i.next();
- if (f.match(m)) {
- matched = true;
- i.remove();
- match = f;
- if(logMINOR) Logger.minor(this,
"Matched: "+f);
- break; // Only one match permitted per
message
- }
- }
- }
- if(match != null) {
- match.setMessage(m);
- match.onMatched();
- }
- // Feed unmatched messages to the dispatcher
- if ((!matched) && (_dispatcher != null)) {
- try {
- if(logMINOR) Logger.minor(this, "Feeding to dispatcher:
"+m);
- matched = _dispatcher.handleMessage(m);
- } catch (Throwable t) {
- Logger.error(this, "Dispatcher threw "+t, t);
- }
- }
- // Keep the last few _unclaimed messages around in case the
intended receiver isn't receiving yet
- if (!matched) {
- if(logMINOR) Logger.minor(this, "Unclaimed: "+m);
- /** Check filters and then add to _unmatched is ATOMIC
- * It has to be atomic, because otherwise we can get a
- * race condition that results in timeouts on MFs.
- *
- * Specifically:
- * - Thread A receives packet
- * - Thread A checks filters. It doesn't match any.
- * - Thread A feeds to Dispatcher.
- * - Thread B creates filter.
- * - Thread B checks _unmatched.
- * - Thread B adds filter.
- * - Thread B sleeps.
- * - Thread A returns from Dispatcher. Which didn't match.
- * - Thread A adds to _unmatched.
- *
- * OOPS!
- * The only way to fix this is to have checking the
- * filters and unmatched be a single atomic operation.
- * Another race is possible if we merely recheck the
- * filters after we return from dispatcher, for example.
- */
- synchronized (_filters) {
- if(logMINOR) Logger.minor(this, "Rechecking
filters and adding message");
- for (ListIterator i = _filters.listIterator();
i.hasNext();) {
- MessageFilter f = (MessageFilter)
i.next();
- if (f.match(m)) {
- matched = true;
- match = f;
- i.remove();
- if(logMINOR) Logger.minor(this,
"Matched: "+f);
- break; // Only one match
permitted per message
- }
- }
- if(!matched) {
- while (_unclaimed.size() >
MAX_UNMATCHED_FIFO_SIZE) {
- Message removed =
(Message)_unclaimed.removeFirst();
- long messageLifeTime =
System.currentTimeMillis() - removed.localInstantiationTime;
- if ((removed.getSource()) instanceof
PeerNode) {
- Logger.normal(this, "Dropping
unclaimed from "+removed.getSource().getPeer()+", lived
"+TimeUtil.formatTime(messageLifeTime, 2, true)+" (quantity)"+": "+removed);
- } else {
- Logger.normal(this, "Dropping
unclaimed, lived "+TimeUtil.formatTime(messageLifeTime, 2, true)+"
(quantity)"+": "+removed);
- }
- }
- _unclaimed.addLast(m);
- if(logMINOR) Logger.minor(this, "Done");
- }
- }
- if(match != null) {
- match.setMessage(m);
- match.onMatched();
- }
- }
- long tEnd = System.currentTimeMillis();
- if(tEnd - tStart > 50) {
- if(tEnd - tStart > 3000)
- Logger.error(this, "checkFilters took
"+(tEnd-tStart)+"ms with unclaimedFIFOSize of "+_unclaimed.size()+" for
matched: "+matched);
- else
- if(logMINOR) Logger.minor(this, "checkFilters
took "+(tEnd-tStart)+"ms with unclaimedFIFOSize of "+_unclaimed.size()+" for
matched: "+matched);
- }
- }
-
- /** IncomingPacketFilter should call this when a node is disconnected.
*/
- public void onDisconnect(PeerContext ctx) {
- Vector droppedFilters = null; // rare operation, we can waste
objects for better locking
- synchronized(_filters) {
- ListIterator i = _filters.listIterator();
- while (i.hasNext()) {
- MessageFilter f = (MessageFilter) i.next();
- if(f.matchesDroppedConnection(ctx)) {
- if(droppedFilters == null)
- droppedFilters = new Vector();
- droppedFilters.add(f);
- i.remove();
- }
- }
- }
- if(droppedFilters != null) {
- for(int i=0;i<droppedFilters.size();i++) {
- MessageFilter mf = (MessageFilter)
droppedFilters.get(i);
- mf.onDroppedConnection(ctx);
- }
- }
- }
-
- /** IncomingPacketFilter should call this when a node connects with a
new boot ID */
- public void onRestart(PeerContext ctx) {
- Vector droppedFilters = null; // rare operation, we can waste
objects for better locking
- synchronized(_filters) {
- ListIterator i = _filters.listIterator();
- while (i.hasNext()) {
- MessageFilter f = (MessageFilter) i.next();
- if(f.matchesRestartedConnection(ctx)) {
- if(droppedFilters == null)
- droppedFilters = new Vector();
- droppedFilters.add(f);
- i.remove();
- }
- }
- }
- if(droppedFilters != null) {
- for(int i=0;i<droppedFilters.size();i++) {
- MessageFilter mf = (MessageFilter)
droppedFilters.get(i);
- mf.onRestartedConnection(ctx);
- }
- }
- }
-
- public void addAsyncFilter(MessageFilter filter,
AsyncMessageFilterCallback callback) throws DisconnectedException {
- filter.setAsyncCallback(callback);
- filter.onStartWaiting();
- boolean logDEBUG = Logger.shouldLog(Logger.DEBUG, this);
- if(logDEBUG) Logger.debug(this, "Adding async filter "+filter+"
for "+callback);
- Message ret = null;
- if((lowLevelFilter != null) && (filter._source != null) &&
- filter.matchesDroppedConnection(filter._source) &&
- lowLevelFilter.isDisconnected(filter._source))
- throw new DisconnectedException();
- // Check to see whether the filter matches any of the recently
_unclaimed messages
- // Drop any _unclaimed messages that the filter doesn't match
that are also older than MAX_UNCLAIMED_FIFO_ITEM_LIFETIME
- long now = System.currentTimeMillis();
- long messageDropTime = now - MAX_UNCLAIMED_FIFO_ITEM_LIFETIME;
- long messageLifeTime = 0;
- synchronized (_filters) {
- if(logMINOR) Logger.minor(this, "Checking _unclaimed");
- for (ListIterator i = _unclaimed.listIterator();
i.hasNext();) {
- Message m = (Message) i.next();
- if (filter.match(m)) {
- i.remove();
- ret = m;
- if(logMINOR) Logger.debug(this,
"Matching from _unclaimed");
- break;
- } else if (m.localInstantiationTime <
messageDropTime) {
- i.remove();
- messageLifeTime = now -
m.localInstantiationTime;
- if ((m.getSource()) instanceof
PeerNode) {
- Logger.normal(this, "Dropping
unclaimed from "+m.getSource().getPeer()+", lived
"+TimeUtil.formatTime(messageLifeTime, 2, true)+" (age)"+": "+m);
- } else {
- Logger.normal(this, "Dropping
unclaimed, lived "+TimeUtil.formatTime(messageLifeTime, 2, true)+" (age)"+":
"+m);
- }
- }
- }
- if (ret == null) {
- if(logMINOR) Logger.minor(this, "Not in
_unclaimed");
- // Insert filter into filter list in order of
timeout
- ListIterator i = _filters.listIterator();
- while (true) {
- if (!i.hasNext()) {
- i.add(filter);
- if(logMINOR) Logger.minor(this,
"Added at end");
- break;
- }
- MessageFilter mf = (MessageFilter)
i.next();
- if (mf.getTimeout() >
filter.getTimeout()) {
- i.previous();
- i.add(filter);
- if(logMINOR) Logger.minor(this,
"Added in middle - mf timeout="+mf.getTimeout()+" - my
timeout="+filter.getTimeout());
- break;
- }
- }
- }
- }
- if(ret != null) {
- filter.onMatched();
- filter.clearMatched();
- }
- }
-
- /**
- * Wait for a filter to trigger, or timeout. Blocks until either the
trigger is activated, or it times
- * out, or the peer is disconnected.
- * @param filter The filter to wait for.
- * @param ctr Byte counter to add bytes from the message to.
- * @return Either a message, or null if the filter timed out.
- * @throws DisconnectedException If the single peer being waited for
disconnects.
- */
- public Message waitFor(MessageFilter filter, ByteCounter ctr) throws
DisconnectedException {
- boolean logDEBUG = Logger.shouldLog(Logger.DEBUG, this);
- if(logDEBUG) Logger.debug(this, "Waiting for "+filter);
- long startTime = System.currentTimeMillis();
- filter.onStartWaiting();
- Message ret = null;
- if((lowLevelFilter != null) && (filter._source != null) &&
- filter.matchesDroppedConnection(filter._source) &&
- lowLevelFilter.isDisconnected(filter._source))
- throw new DisconnectedException();
- // Check to see whether the filter matches any of the recently
_unclaimed messages
- // Drop any _unclaimed messages that the filter doesn't match
that are also older than MAX_UNCLAIMED_FIFO_ITEM_LIFETIME
- long now = System.currentTimeMillis();
- long messageDropTime = now - MAX_UNCLAIMED_FIFO_ITEM_LIFETIME;
- long messageLifeTime = 0;
- synchronized (_filters) {
- if(logMINOR) Logger.minor(this, "Checking _unclaimed");
- for (ListIterator i = _unclaimed.listIterator();
i.hasNext();) {
- Message m = (Message) i.next();
- if (filter.match(m)) {
- i.remove();
- ret = m;
- if(logMINOR) Logger.debug(this,
"Matching from _unclaimed");
- break;
- } else if (m.localInstantiationTime <
messageDropTime) {
- i.remove();
- messageLifeTime = now -
m.localInstantiationTime;
- if ((m.getSource()) instanceof
PeerNode) {
- Logger.normal(this, "Dropping
unclaimed from "+m.getSource().getPeer()+", lived
"+TimeUtil.formatTime(messageLifeTime, 2, true)+" (age)"+": "+m);
- } else {
- Logger.normal(this, "Dropping
unclaimed, lived "+TimeUtil.formatTime(messageLifeTime, 2, true)+" (age)"+":
"+m);
- }
- }
- }
- if (ret == null) {
- if(logMINOR) Logger.minor(this, "Not in
_unclaimed");
- // Insert filter into filter list in order of
timeout
- ListIterator i = _filters.listIterator();
- while (true) {
- if (!i.hasNext()) {
- i.add(filter);
- if(logMINOR) Logger.minor(this,
"Added at end");
- break;
- }
- MessageFilter mf = (MessageFilter)
i.next();
- if (mf.getTimeout() >
filter.getTimeout()) {
- i.previous();
- i.add(filter);
- if(logMINOR) Logger.minor(this,
"Added in middle - mf timeout="+mf.getTimeout()+" - my
timeout="+filter.getTimeout());
- break;
- }
- }
- }
- }
- long tEnd = System.currentTimeMillis();
- if(tEnd - now > 50) {
- if(tEnd - now > 3000)
- Logger.error(this, "waitFor _unclaimed
iteration took "+(tEnd-now)+"ms with unclaimedFIFOSize of "+_unclaimed.size()+"
for ret of "+ret);
- else
- if(logMINOR) Logger.minor(this, "waitFor
_unclaimed iteration took "+(tEnd-now)+"ms with unclaimedFIFOSize of
"+_unclaimed.size()+" for ret of "+ret);
- }
- // Unlock to wait on filter
- // Waiting on the filter won't release the outer lock
- // So we have to release it here
- if(ret == null) {
- if(logMINOR) Logger.minor(this, "Waiting...");
- synchronized (filter) {
- try {
- // Precaution against filter getting
matched between being added to _filters and
- // here - bug discovered by Mason
- boolean fmatched = false;
- while(!(fmatched = (filter.matched() ||
(filter.droppedConnection() != null)))) {
- long wait =
filter.getTimeout()-System.currentTimeMillis();
- if(wait > 0)
- filter.wait(wait);
- else break;
- }
- if(filter.droppedConnection() != null)
- throw new DisconnectedException();
- if(logMINOR) Logger.minor(this, "Matched:
"+fmatched);
- } catch (InterruptedException e) {
- }
- ret = filter.getMessage();
- filter.clearMatched();
- }
- if(logDEBUG) Logger.debug(this, "Returning "+ret+" from
"+filter);
- } else {
- // Matched an unclaimed packet
- filter.onMatched();
- filter.clearMatched();
- }
- // Probably get rid...
-// if (Dijjer.getDijjer().getDumpMessageWaitTimes() != null) {
-//
Dijjer.getDijjer().getDumpMessageWaitTimes().println(filter.toString() + "\t" +
filter.getInitialTimeout() + "\t"
-// + (System.currentTimeMillis() -
startTime));
-// Dijjer.getDijjer().getDumpMessageWaitTimes().flush();
-// }
- long endTime = System.currentTimeMillis();
- if(logDEBUG) Logger.debug(this, "Returning in
"+(endTime-startTime)+"ms");
- if((ctr != null) && (ret != null))
- ctr.receivedBytes(ret._receivedByteCount);
- return ret;
- }
-
- /**
- * Send a Message to a PeerContext.
- * @throws NotConnectedException If we are not currently connected to
the node.
- */
- public void send(PeerContext destination, Message m, ByteCounter ctr)
throws NotConnectedException {
- if(m.getSpec().isInternalOnly()) {
- Logger.error(this, "Trying to send internal-only message "+m+"
of spec "+m.getSpec(), new Exception("debug"));
- return;
- }
- if ((m.getSpec().equals(DMT.ping) ||
m.getSpec().equals(DMT.pong)) && logMINOR) {
- if(Logger.shouldLog(Logger.DEBUG, this))
- Logger.debug(this, "" +
(System.currentTimeMillis() % 60000) + ' ' + _sock.getPort() + " -> " +
destination
- + " : " + m);
- } else {
- if(logMINOR) Logger.minor(this, "" +
(System.currentTimeMillis() % 60000) + ' ' + _sock.getPort() + " -> " +
destination
- + " : " + m);
- }
-// byte[] blockToSend = m.encodeToPacket(lowLevelFilter,
destination);
-// if(lowLevelFilter != null) {
-// try {
-// lowLevelFilter.processOutgoing(blockToSend, 0,
blockToSend.length, destination);
-// return;
-// } catch (IncomingPacketFilterException t) {
-// Logger.error(this, "Caught "+t+" sending "+m+"
to "+destination, t);
-// destination.forceDisconnect();
-// throw new NotConnectedException("Error
"+t.toString()+" forced disconnect");
-// }
-// } else {
-// sendPacket(blockToSend, destination.getPeer());
-// }
- destination.sendAsync(m, null, 0, ctr);
- }
-
- /**
- * Send a block of encoded bytes to a peer. This is called by
- * send, and by IncomingPacketFilter.processOutgoing(..).
- * @param blockToSend The data block to send.
- * @param destination The peer to send it to.
- */
- public void sendPacket(byte[] blockToSend, Peer destination, boolean
allowLocalAddresses) throws LocalAddressException {
- assert(blockToSend != null);
- // there should be no DNS needed here, but go ahead if we can,
but complain doing it
- if( destination.getAddress(false, allowLocalAddresses) == null
) {
- Logger.error(this, "Tried sending to destination
without pre-looked up IP address(needs a real Peer.getHostname()): null:" +
destination.getPort(), new Exception("error"));
- if( destination.getAddress(true, allowLocalAddresses)
== null ) {
- Logger.error(this, "Tried sending to bad
destination address: null:" + destination.getPort(), new Exception("error"));
- return;
- }
- }
- if (_dropProbability > 0) {
- if (dropRandom.nextInt() % _dropProbability == 0) {
- if(logMINOR) Logger.minor(this, "DROPPED: " +
_sock.getLocalPort() + " -> " + destination.getPort());
- return;
- }
- }
- InetAddress address = destination.getAddress(false,
allowLocalAddresses);
- assert(address != null);
- int port = destination.getPort();
- DatagramPacket packet = new DatagramPacket(blockToSend,
blockToSend.length);
- packet.setAddress(address);
- packet.setPort(port);
-
- // TODO: keep?
- // packet.length() is simply the size of the buffer, it knows
nothing of UDP headers
- IOStatisticCollector.addInfo(address + ":" + port, 0,
blockToSend.length + UDP_HEADERS_LENGTH);
-
- try {
- _sock.send(packet);
- } catch (IOException e) {
- if(packet.getAddress() instanceof Inet6Address)
- Logger.normal(this, "Error while sending packet
to IPv6 address: "+destination+": "+e, e);
- else
- Logger.error(this, "Error while sending packet
to " + destination+": "+e, e);
- }
- }
-
- public void close(boolean exit) {
- Logger.error(this, "Closing.", new Exception("error"));
- synchronized (this) {
- while (!_isDone) {
- try {
- wait(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- if (exit) {
- _sock.close();
- } else {
- Logger.fatal(this, 10, "Not implemented: close(false)");
- //Updater.saveResource(_sock);
- }
- }
-
- public void setDispatcher(Dispatcher d) {
- _dispatcher = d;
- }
-
- /** Must be called, or we will NPE */
- public void setLowLevelFilter(IncomingPacketFilter f) {
- lowLevelFilter = f;
- }
-
- public String toString() {
- return _sock.getLocalAddress() + ":" + _sock.getLocalPort();
- }
-
- public void setDropProbability(int dropProbability) {
- _dropProbability = dropProbability;
- }
-
- public static UdpSocketManager getUdpSocketManager()
- {
- return _usm;
- }
-
-// public static void init(int externalListenPort, InetAddress bindto)
-// throws SocketException
-// {
-// _usm = new UdpSocketManager(externalListenPort, bindto);
-// }
-//
- public int getPortNumber() {
- return _sock.getLocalPort();
- }
-
-
- // CompuServe use 1400 MTU; AOL claim 1450; DFN at home use 1448.
- // http://info.aol.co.uk/broadband/faqHomeNetworking.adp
- //
http://www.compuserve.de/cso/hilfe/linux/hilfekategorien/installation/contentview.jsp?conid=385700
- // http://www.studenten-ins-netz.net/inhalt/service_faq.html
- // officially GRE is 1476 and PPPoE is 1492.
- // unofficially, PPPoE is often 1472 (seen in the wild). Also PPPoATM
is sometimes 1472.
- static final int MAX_ALLOWED_MTU = 1400;
- // FIXME this is different for IPv6 (check all uses of constant when
fixing)
- public static final int UDP_HEADERS_LENGTH = 28;
-
- /**
- * @return The maximum packet size supported by this SocketManager, not
including transport (UDP/IP) headers.
- */
- public int getMaxPacketSize() { //FIXME: what about passing a peerNode
though and doing it on a per-peer basis?
- final int minAdvertisedMTU = node.ipDetector.getMinimumDetectedMTU();
-
- // We don't want the MTU detection thingy to prevent us to send
PacketTransmits!
- if(minAdvertisedMTU < 1100){
- Logger.error(this, "It shouldn't happen : we disabled the MTU
detection algorithm because the advertised MTU is smallish !!
("+node.ipDetector.getMinimumDetectedMTU()+')');
- return MAX_ALLOWED_MTU - UDP_HEADERS_LENGTH;
- } else
- return (minAdvertisedMTU < MAX_ALLOWED_MTU ? minAdvertisedMTU :
MAX_ALLOWED_MTU) - UDP_HEADERS_LENGTH;
- // UDP/IP header is 28 bytes.
- }
-
- /**
- * @return the number of received messages that are currently unclaimed
- */
- public int getUnclaimedFIFOSize() {
- synchronized (_filters){
- return _unclaimed.size();
- }
- }
-
- public Map getUnclaimedFIFOMessageCounts() {
- Map messageCounts = new HashMap();
- synchronized(_filters) {
- for (ListIterator i = _unclaimed.listIterator();
i.hasNext();) {
- Message m = (Message) i.next();
- String messageName = m.getSpec().getName();
- Integer messageCount = (Integer)
messageCounts.get(messageName);
- if (messageCount == null) {
- messageCounts.put(messageName, new
Integer(1) );
- } else {
- messageCount = new
Integer(messageCount.intValue() + 1);
- messageCounts.put(messageName,
messageCount );
- }
- }
- }
- return messageCounts;
- }
-
- public int getDropProbability() {
- return _dropProbability;
- }
-}
Modified: trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockReceiver.java 2007-06-27
17:55:09 UTC (rev 13783)
+++ trunk/freenet/src/freenet/io/xfer/BlockReceiver.java 2007-06-27
21:00:20 UTC (rev 13784)
@@ -29,7 +29,7 @@
import freenet.io.comm.NotConnectedException;
import freenet.io.comm.PeerContext;
import freenet.io.comm.RetrievalException;
-import freenet.io.comm.UdpSocketManager;
+import freenet.io.comm.MessageCore;
import freenet.support.BitArray;
import freenet.support.Buffer;
import freenet.support.Logger;
@@ -47,12 +47,12 @@
PartiallyReceivedBlock _prb;
PeerContext _sender;
long _uid;
- UdpSocketManager _usm;
+ MessageCore _usm;
/** packet : Integer -> reportTime : Long * */
HashMap _recentlyReportedMissingPackets = new HashMap();
ByteCounter _ctr;
- public BlockReceiver(UdpSocketManager usm, PeerContext sender, long
uid, PartiallyReceivedBlock prb, ByteCounter ctr) {
+ public BlockReceiver(MessageCore usm, PeerContext sender, long uid,
PartiallyReceivedBlock prb, ByteCounter ctr) {
_sender = sender;
_prb = prb;
_uid = uid;
Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2007-06-27
17:55:09 UTC (rev 13783)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2007-06-27
21:00:20 UTC (rev 13784)
@@ -26,11 +26,10 @@
import freenet.io.comm.DMT;
import freenet.io.comm.DisconnectedException;
import freenet.io.comm.Message;
+import freenet.io.comm.MessageCore;
import freenet.io.comm.MessageFilter;
import freenet.io.comm.NotConnectedException;
import freenet.io.comm.PeerContext;
-import freenet.io.comm.UdpSocketManager;
-import freenet.node.FNPPacketMangler;
import freenet.node.PeerNode;
import freenet.support.BitArray;
import freenet.support.DoubleTokenBucket;
@@ -46,7 +45,7 @@
public static final int SEND_TIMEOUT = 60000;
public static final int PING_EVERY = 8;
- UdpSocketManager _usm;
+ MessageCore _usm;
PeerContext _destination;
boolean _sendComplete;
long _uid;
@@ -61,7 +60,7 @@
final ByteCounter _ctr;
final int PACKET_SIZE;
- public BlockTransmitter(UdpSocketManager usm, PeerContext destination,
long uid, PartiallyReceivedBlock source, DoubleTokenBucket masterThrottle,
ByteCounter ctr) {
+ public BlockTransmitter(MessageCore usm, PeerContext destination, long
uid, PartiallyReceivedBlock source, DoubleTokenBucket masterThrottle,
ByteCounter ctr) {
_usm = usm;
_destination = destination;
_uid = uid;
@@ -69,7 +68,7 @@
_ctr = ctr;
_masterThrottle = masterThrottle;
PACKET_SIZE = DMT.packetTransmitSize(_prb._packetSize,
_prb._packets)
- + FNPPacketMangler.FULL_HEADERS_LENGTH_ONE_MESSAGE;
+ +
destination.getOutgoingMangler().fullHeadersLengthOneMessage();
try {
_sentPackets = new BitArray(_prb.getNumPackets());
} catch (AbortedException e) {
@@ -232,7 +231,7 @@
}
if(logMINOR) Logger.minor(this, "Got "+msg);
if(!_destination.isConnected()) {
- Logger.normal(this, "Terminating send
"+_uid+" to "+_destination+" from "+_usm.getPortNumber()+" because node
disconnected while waiting");
+ Logger.normal(this, "Terminating send
"+_uid+" to "+_destination+" from "+_destination.getSocketHandler()+" because
node disconnected while waiting");
synchronized(_senderThread) {
_sendComplete = true;
_senderThread.notifyAll();
@@ -249,7 +248,7 @@
_sendComplete = true;
_senderThread.notifyAll();
}
- Logger.error(this, "Terminating
send "+_uid+" to "+_destination+" from "+_usm.getPortNumber()+" as we haven't
heard from receiver in "+TimeUtil.formatTime((now - timeAllSent), 2, true)+
'.');
+ Logger.error(this, "Terminating
send "+_uid+" to "+_destination+" from "+_destination.getSocketHandler()+" as
we haven't heard from receiver in "+TimeUtil.formatTime((now - timeAllSent), 2,
true)+ '.');
return false;
} else {
if(logMINOR) Logger.minor(this,
"Ignoring timeout: timeAllSent="+timeAllSent+" ("+(System.currentTimeMillis() -
timeAllSent)+"), getNumSent="+getNumSent()+ '/' +_prb.getNumPackets());
Modified: trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java 2007-06-27
17:55:09 UTC (rev 13783)
+++ trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java 2007-06-27
21:00:20 UTC (rev 13784)
@@ -41,6 +41,7 @@
final DoubleTokenBucket masterThrottle;
private boolean sentCancel;
private boolean finished;
+ final int packetSize;
public BulkTransmitter(PartiallyReceivedBulk prb, PeerContext peer,
long uid, DoubleTokenBucket masterThrottle) throws DisconnectedException {
this.prb = prb;
@@ -88,6 +89,8 @@
cancel();
throw e;
}
+ packetSize = DMT.bulkPacketTransmitSize(prb.blockSize) +
+ peer.getOutgoingMangler().fullHeadersLengthOneMessage();
}
/**
@@ -146,7 +149,6 @@
* @return True if the file was successfully sent. False otherwise.
*/
public boolean send() {
- int packetSize = prb.getPacketSize();
long lastSentPacket = System.currentTimeMillis();
while(true) {
if(prb.isAborted()) return false;
Modified: trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
2007-06-27 17:55:09 UTC (rev 13783)
+++ trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
2007-06-27 21:00:20 UTC (rev 13784)
@@ -5,10 +5,8 @@
import java.io.IOException;
-import freenet.io.comm.DMT;
+import freenet.io.comm.MessageCore;
import freenet.io.comm.RetrievalException;
-import freenet.io.comm.UdpSocketManager;
-import freenet.node.FNPPacketMangler;
import freenet.support.BitArray;
import freenet.support.Logger;
import freenet.support.io.RandomAccessThing;
@@ -31,11 +29,10 @@
private final BitArray blocksReceived;
final int blocks;
private BulkTransmitter[] transmitters;
- final UdpSocketManager usm;
+ final MessageCore usm;
/** The one and only BulkReceiver */
private BulkReceiver recv;
private int blocksReceivedCount;
- final int packetSize;
// Abort status
boolean _aborted;
int _abortReason;
@@ -49,7 +46,7 @@
* @param initialState If true, assume all blocks have been received.
If false, assume no blocks have
* been received.
*/
- public PartiallyReceivedBulk(UdpSocketManager usm, long size, int
blockSize, RandomAccessThing raf, boolean initialState) {
+ public PartiallyReceivedBulk(MessageCore usm, long size, int blockSize,
RandomAccessThing raf, boolean initialState) {
this.size = size;
this.blockSize = blockSize;
this.raf = raf;
@@ -63,8 +60,6 @@
blocksReceived.setAllOnes();
blocksReceivedCount = this.blocks;
}
- packetSize = DMT.bulkPacketTransmitSize(blockSize) +
- FNPPacketMangler.FULL_HEADERS_LENGTH_ONE_MESSAGE; //
FIXME generalise
}
/**
@@ -153,10 +148,6 @@
return _aborted;
}
- public int getPacketSize() {
- return packetSize;
- }
-
public boolean hasWholeFile() {
return blocksReceivedCount >= blocks;
}
Modified: trunk/freenet/src/freenet/node/FNPPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/FNPPacketMangler.java 2007-06-27
17:55:09 UTC (rev 13783)
+++ trunk/freenet/src/freenet/node/FNPPacketMangler.java 2007-06-27
21:00:20 UTC (rev 13784)
@@ -38,7 +38,8 @@
private static boolean logMINOR;
final Node node;
final PeerManager pm;
- final UdpSocketManager usm;
+ final MessageCore usm;
+ final PacketSocketHandler sock;
final EntropySource fnpTimingSource;
final EntropySource myPacketDataSource;
private static final int MAX_PACKETS_IN_FLIGHT = 256;
@@ -64,17 +65,18 @@
static public final int HEADERS_LENGTH_ONE_MESSAGE =
HEADERS_LENGTH_MINIMUM + 2; // 2 bytes = length of message.
rest is the same.
- static public final int FULL_HEADERS_LENGTH_MINIMUM =
- HEADERS_LENGTH_MINIMUM + UdpSocketManager.UDP_HEADERS_LENGTH;
- static public final int FULL_HEADERS_LENGTH_ONE_MESSAGE =
- HEADERS_LENGTH_ONE_MESSAGE +
UdpSocketManager.UDP_HEADERS_LENGTH;
-
- public FNPPacketMangler(Node node) {
+ final int fullHeadersLengthMinimum;
+ final int fullHeadersLengthOneMessage;
+
+ public FNPPacketMangler(Node node, PacketSocketHandler sock) {
this.node = node;
this.pm = node.peers;
this.usm = node.usm;
+ this.sock = sock;
fnpTimingSource = new EntropySource();
myPacketDataSource = new EntropySource();
+ fullHeadersLengthMinimum = HEADERS_LENGTH_MINIMUM +
sock.getHeadersLength();
+ fullHeadersLengthOneMessage = HEADERS_LENGTH_ONE_MESSAGE +
sock.getHeadersLength();
}
/**
@@ -432,7 +434,7 @@
*/
private void sendAuthPacket(byte[] output, PeerNode pn, Peer replyTo) {
int length = output.length;
- if(length > node.usm.getMaxPacketSize()) {
+ if(length > sock.getMaxPacketSize()) {
throw new IllegalStateException("Cannot send auth packet: too
long: "+length);
}
BlockCipher cipher = pn.outgoingSetupCipher;
@@ -469,7 +471,7 @@
Peer p = pn.getPeer();
if(p != null) replyTo = p;
}
- usm.sendPacket(data, replyTo, pn.allowLocalAddresses());
+ sock.sendPacket(data, replyTo, pn.allowLocalAddresses());
pn.reportOutgoingBytes(data.length);
node.outputThrottle.forceGrab(data.length - alreadyReportedBytes);
}
@@ -889,7 +891,7 @@
ptr+=length;
if(m != null) {
//Logger.minor(this, "Dispatching packet: "+m);
- usm.checkFilters(m);
+ usm.checkFilters(m, sock);
}
}
if(logMINOR) Logger.minor(this, "Done");
@@ -932,7 +934,7 @@
}
int packetNumber =
kt.allocateOutgoingPacketNumberNeverBlock();
this.processOutgoingPreformatted(buf, 0, buf.length,
pn.getCurrentKeyTracker(), packetNumber, mi.cb, mi.alreadyReportedBytes);
- mi.onSent(buf.length + FULL_HEADERS_LENGTH_ONE_MESSAGE);
+ mi.onSent(buf.length + fullHeadersLengthOneMessage);
} catch (NotConnectedException e) {
Logger.normal(this, "Caught "+e+" while sending messages
("+mi_name+") to "+pn.getPeer()+requeueLogString);
// Requeue
@@ -969,7 +971,7 @@
} else {
byte[] data = mi.getData(pn);
messageData[x] = data;
- if(data.length > node.usm.getMaxPacketSize()) {
+ if(data.length > sock.getMaxPacketSize()) {
Logger.error(this, "Message exceeds packet size:
"+messages[i]+" size "+data.length+" message "+mi.msg);
// Will be handled later
}
@@ -1003,7 +1005,7 @@
}
if(x != callbacksCount) throw new IllegalStateException();
- if((length + HEADERS_LENGTH_MINIMUM < node.usm.getMaxPacketSize()) &&
+ if((length + HEADERS_LENGTH_MINIMUM < sock.getMaxPacketSize()) &&
(messageData.length < 256)) {
mi_name = null;
try {
@@ -1011,7 +1013,7 @@
for(int i=0;i<messageData.length;i++) {
MessageItem mi = newMsgs[i];
mi_name = (mi.msg == null ? "(not a
Message)" : mi.msg.getSpec().getName());
- mi.onSent(messageData[i].length + 2 +
(FULL_HEADERS_LENGTH_MINIMUM / messageData.length));
+ mi.onSent(messageData[i].length + 2 +
(fullHeadersLengthMinimum / messageData.length));
}
} catch (NotConnectedException e) {
Logger.normal(this, "Caught "+e+" while sending messages
("+mi_name+") to "+pn.getPeer()+requeueLogString);
@@ -1048,7 +1050,7 @@
else thisLength = (messageData[i].length + 2);
int newLength = length + thisLength;
count++;
- if((newLength + HEADERS_LENGTH_MINIMUM >
node.usm.getMaxPacketSize()) || (count > 255) || (i == messages.length)) {
+ if((newLength + HEADERS_LENGTH_MINIMUM >
sock.getMaxPacketSize()) || (count > 255) || (i == messages.length)) {
// lastIndex up to the message right before this one
// e.g. lastIndex = 0, i = 1, we just send message 0
if(lastIndex != i) {
@@ -1058,7 +1060,7 @@
for(int j=lastIndex;j<i;j++) {
MessageItem mi = newMsgs[j];
mi_name =
(mi.msg == null ? "(not a Message)" : mi.msg.getSpec().getName());
-
mi.onSent(messageData[j].length + 2 + (FULL_HEADERS_LENGTH_MINIMUM /
(i-lastIndex)));
+
mi.onSent(messageData[j].length + 2 + (fullHeadersLengthMinimum /
(i-lastIndex)));
}
} catch (NotConnectedException e) {
Logger.normal(this, "Caught "+e+" while sending
messages ("+mi_name+") to "+pn.getPeer()+requeueLogString);
@@ -1548,4 +1550,12 @@
public int[] supportedNegTypes() {
return new int[] { 1 };
}
+
+ public int fullHeadersLengthOneMessage() {
+ return fullHeadersLengthOneMessage;
+ }
+
+ public SocketHandler getSocketHandler() {
+ return sock;
+ }
}
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2007-06-27 17:55:09 UTC (rev
13783)
+++ trunk/freenet/src/freenet/node/Node.java 2007-06-27 21:00:20 UTC (rev
13784)
@@ -62,11 +62,12 @@
import freenet.io.comm.DisconnectedException;
import freenet.io.comm.FreenetInetAddress;
import freenet.io.comm.Message;
+import freenet.io.comm.MessageCore;
import freenet.io.comm.MessageFilter;
import freenet.io.comm.Peer;
import freenet.io.comm.PeerParseException;
import freenet.io.comm.ReferenceSignatureVerificationException;
-import freenet.io.comm.UdpSocketManager;
+import freenet.io.comm.UdpSocketHandler;
import freenet.io.xfer.PartiallyReceivedBlock;
import freenet.keys.CHKBlock;
import freenet.keys.CHKVerifyException;
@@ -369,7 +370,10 @@
public final RandomSource random;
/** Weak but fast RNG */
public final Random fastWeakRandom;
- final UdpSocketManager usm;
+ /** The object which handles incoming messages and allows us to wait
for them */
+ final MessageCore usm;
+ /** The object which handles our specific UDP port, pulls messages from
it, feeds them to the packet mangler for decryption etc */
+ final UdpSocketHandler sock;
final FNPPacketMangler packetMangler;
final DNSRequester dnsr;
public final PacketSender ps;
@@ -791,8 +795,10 @@
port=-1;
}
- UdpSocketManager u = null;
+ usm = new MessageCore();
+ UdpSocketHandler u = null;
+
if(port > 65535) {
throw new NodeInitException(EXIT_IMPOSSIBLE_USM_PORT,
"Impossible port number: "+port);
} else if(port == -1) {
@@ -800,7 +806,7 @@
for(int i=0;i<200000;i++) {
int portNo = 1024 + random.nextInt(65535-1024);
try {
- u = new UdpSocketManager(portNo,
InetAddress.getByName(bindto), this);
+ u = new UdpSocketHandler(portNo,
InetAddress.getByName(bindto), this);
port = u.getPortNumber();
break;
} catch (Exception e) {
@@ -814,12 +820,12 @@
throw new
NodeInitException(EXIT_NO_AVAILABLE_UDP_PORTS, "Could not find an available UDP
port number for FNP (none specified)");
} else {
try {
- u = new UdpSocketManager(port,
InetAddress.getByName(bindto), this);
+ u = new UdpSocketHandler(port,
InetAddress.getByName(bindto), this);
} catch (Exception e) {
throw new
NodeInitException(EXIT_IMPOSSIBLE_USM_PORT, "Could not bind to port: "+port+"
(node already running?)");
}
}
- usm = u;
+ sock = u;
Logger.normal(this, "FNP port created on "+bindto+ ':' +port);
System.out.println("FNP port created on "+bindto+ ':' +port);
@@ -829,17 +835,17 @@
new IntCallback() {
public int get() {
- return usm.getDropProbability();
+ return
((UdpSocketHandler)sock).getDropProbability();
}
public void set(int val) throws
InvalidConfigValueException {
- usm.setDropProbability(val);
+
((UdpSocketHandler)sock).setDropProbability(val);
}
});
int dropProb = nodeConfig.getInt("testingDropPacketsEvery");
- usm.setDropProbability(dropProb);
+ ((UdpSocketHandler)sock).setDropProbability(dropProb);
Logger.normal(Node.class, "Creating node...");
@@ -995,7 +1001,7 @@
peers.updatePMUserAlert();
usm.setDispatcher(dispatcher=new NodeDispatcher(this));
- usm.setLowLevelFilter(packetMangler = new
FNPPacketMangler(this));
+ sock.setLowLevelFilter(packetMangler = new
FNPPacketMangler(this, sock));
// Extra Peer Data Directory
nodeConfig.register("extraPeerDataDir", new File(nodeDir,
"extra-peer-data-"+portNumber).toString(), sortOrder++, true, false,
"Node.extraPeerDir", "Node.extraPeerDirLong",
@@ -1439,8 +1445,10 @@
ps.start(nodeStats);
peers.start(); // must be before usm
nodeStats.start();
- usm.start(disableHangCheckers);
+ usm.start(ps);
+ ((UdpSocketHandler)sock).start(disableHangCheckers);
+
if(isUsingWrapper()) {
Logger.normal(this, "Using wrapper correctly:
"+nodeStarter);
System.out.println("Using wrapper correctly:
"+nodeStarter);
@@ -2663,7 +2671,7 @@
return myName;
}
- public UdpSocketManager getUSM() {
+ public MessageCore getUSM() {
return usm;
}
Modified: trunk/freenet/src/freenet/node/NodeIPDetector.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeIPDetector.java 2007-06-27 17:55:09 UTC
(rev 13783)
+++ trunk/freenet/src/freenet/node/NodeIPDetector.java 2007-06-27 21:00:20 UTC
(rev 13784)
@@ -11,7 +11,7 @@
import freenet.config.SubConfig;
import freenet.io.comm.FreenetInetAddress;
import freenet.io.comm.Peer;
-import freenet.io.comm.UdpSocketManager;
+import freenet.io.comm.MessageCore;
import freenet.l10n.L10n;
import freenet.node.useralerts.IPUndetectedUserAlert;
import freenet.node.useralerts.SimpleUserAlert;
@@ -86,9 +86,9 @@
addedValidIP = true;
}
boolean dontDetect = false;
- UdpSocketManager usm = node.usm;
+ MessageCore usm = node.usm;
if(usm != null) {
- InetAddress addr = node.usm.getBindTo();
+ InetAddress addr = node.sock.getBindTo();
if(addr != null && (IPUtil.isValidAddress(addr,
false))) {
dontDetect = true;
Peer p = new Peer(addr, node.portNumber);
Modified: trunk/freenet/src/freenet/node/NodeStats.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeStats.java 2007-06-27 17:55:09 UTC
(rev 13783)
+++ trunk/freenet/src/freenet/node/NodeStats.java 2007-06-27 21:00:20 UTC
(rev 13784)
@@ -302,6 +302,9 @@
new TokenBucket(Math.max(obwLimit*60, 32768*20),
(int)((1000L*1000L*1000L) / (obwLimit *
FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS)), 0);
requestInputThrottle =
new TokenBucket(Math.max(ibwLimit*60, 32768*20),
(int)((1000L*1000L*1000L) / (ibwLimit *
FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS)), 0);
+
+ estimatedSizeOfOneThrottledPacket = 1024 +
DMT.packetTransmitSize(1024, 32) +
+ node.packetMangler.fullHeadersLengthOneMessage();
}
protected String l10n(String key) {
@@ -321,9 +324,7 @@
private long lastAcceptedRequest = -1;
- static final int ESTIMATED_SIZE_OF_ONE_THROTTLED_PACKET =
- 1024 + DMT.packetTransmitSize(1024, 32)
- + FNPPacketMangler.FULL_HEADERS_LENGTH_ONE_MESSAGE;
+ final int estimatedSizeOfOneThrottledPacket;
final Runnable throttledPacketSendAverageIdleUpdater =
new Runnable() {
@@ -332,8 +333,8 @@
try {
if(throttledPacketSendAverage.lastReportTime() < now - 5000) { // if last
report more than 5 seconds ago
// shouldn't take long
-
node.outputThrottle.blockingGrab(ESTIMATED_SIZE_OF_ONE_THROTTLED_PACKET);
-
node.outputThrottle.recycle(ESTIMATED_SIZE_OF_ONE_THROTTLED_PACKET);
+
node.outputThrottle.blockingGrab(estimatedSizeOfOneThrottledPacket);
+
node.outputThrottle.recycle(estimatedSizeOfOneThrottledPacket);
long after =
System.currentTimeMillis();
// Report time it takes to grab
the bytes.
throttledPacketSendAverage.report(after - now);
Modified: trunk/freenet/src/freenet/node/OutgoingPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/OutgoingPacketMangler.java 2007-06-27
17:55:09 UTC (rev 13783)
+++ trunk/freenet/src/freenet/node/OutgoingPacketMangler.java 2007-06-27
21:00:20 UTC (rev 13784)
@@ -6,6 +6,7 @@
import freenet.io.comm.AsyncMessageCallback;
import freenet.io.comm.NotConnectedException;
import freenet.io.comm.PeerContext;
+import freenet.io.comm.SocketHandler;
import freenet.support.WouldBlockException;
/**
@@ -78,5 +79,15 @@
* List of supported negotiation types in preference order (best last)
*/
public int[] supportedNegTypes();
+
+ /**
+ * Size of the packet headers, in bytes, assuming only one message in
this packet.
+ */
+ public int fullHeadersLengthOneMessage();
+ /**
+ * The SocketHandler we are connected to.
+ */
+ public SocketHandler getSocketHandler();
+
}
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2007-06-27 17:55:09 UTC
(rev 13783)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2007-06-27 21:00:20 UTC
(rev 13784)
@@ -55,6 +55,7 @@
import freenet.io.comm.PeerParseException;
import freenet.io.comm.ReferenceSignatureVerificationException;
import freenet.io.comm.RetrievalException;
+import freenet.io.comm.SocketHandler;
import freenet.io.xfer.BulkReceiver;
import freenet.io.xfer.BulkTransmitter;
import freenet.io.xfer.PacketThrottle;
@@ -113,6 +114,9 @@
/** My low-level address for SocketManager purposes */
private Peer detectedPeer;
+ /** My OutgoingPacketMangler i.e. the object which encrypts packets sent
to this node */
+ private OutgoingPacketMangler outgoingMangler;
+
/** Advertised addresses */
private Vector nominalPeer;
@@ -3954,4 +3958,12 @@
// Ignore
}
}
+
+ public OutgoingPacketMangler getOutgoingMangler() {
+ return outgoingMangler;
+ }
+
+ public SocketHandler getSocketHandler() {
+ return outgoingMangler.getSocketHandler();
+ }
}
Modified:
trunk/freenet/src/freenet/node/simulator/RealNodeRequestInsertTest.java
===================================================================
--- trunk/freenet/src/freenet/node/simulator/RealNodeRequestInsertTest.java
2007-06-27 17:55:09 UTC (rev 13783)
+++ trunk/freenet/src/freenet/node/simulator/RealNodeRequestInsertTest.java
2007-06-27 21:00:20 UTC (rev 13784)
@@ -41,7 +41,7 @@
new File(wd).mkdir();
NodeStarter.globalTestInit(wd); // ignore Random, using our own
// Don't clobber nearby nodes!
- Logger.setupStdoutLogging(Logger.DEBUG,
"freenet.store:minor,freenet.node.Location:normal"
/*"freenet.node.LocationManager:debug,freenet.node.FNPPacketManager:normal,freenet.io.comm.UdpSocketManager:debug"*/);
+ Logger.setupStdoutLogging(Logger.DEBUG,
"freenet.store:minor,freenet.node.Location:normal"
/*"freenet.node.LocationManager:debug,freenet.node.FNPPacketManager:normal,freenet.io.comm.MessageCore:debug"*/);
Logger.globalSetThreshold(Logger.DEBUG);
System.out.println("Insert/retrieve test");
System.out.println();
Modified: trunk/freenet/src/freenet/node/simulator/RealNodeRoutingTest.java
===================================================================
--- trunk/freenet/src/freenet/node/simulator/RealNodeRoutingTest.java
2007-06-27 17:55:09 UTC (rev 13783)
+++ trunk/freenet/src/freenet/node/simulator/RealNodeRoutingTest.java
2007-06-27 21:00:20 UTC (rev 13784)
@@ -33,7 +33,7 @@
static final short MAX_HTL = (short)6;
public static void main(String[] args) throws FSParseException,
PeerParseException, InvalidThresholdException, NodeInitException,
ReferenceSignatureVerificationException {
- Logger.setupStdoutLogging(Logger.NORMAL,
"freenet.node.CPUAdjustingSwapRequestInterval:minor"
/*"freenet.node.LocationManager:debug,freenet.node.FNPPacketManager:normal,freenet.io.comm.UdpSocketManager:debug"*/);
+ Logger.setupStdoutLogging(Logger.NORMAL,
"freenet.node.CPUAdjustingSwapRequestInterval:minor"
/*"freenet.node.LocationManager:debug,freenet.node.FNPPacketManager:normal,freenet.io.comm.MessageCore:debug"*/);
System.out.println("Routing test using real nodes:");
System.out.println();
String wd = "realNodeRequestInsertTest";