Author: toad
Date: 2008-01-12 21:54:21 +0000 (Sat, 12 Jan 2008)
New Revision: 17035

Modified:
   trunk/freenet/src/freenet/node/FNPPacketMangler.java
   trunk/freenet/src/freenet/node/KeyTracker.java
   trunk/freenet/src/freenet/node/MessageItem.java
   trunk/freenet/src/freenet/node/OutgoingPacketMangler.java
   trunk/freenet/src/freenet/node/PeerNode.java
   trunk/freenet/src/freenet/node/ResendPacketItem.java
   trunk/freenet/src/freenet/support/LimitedRangeIntByteArrayMap.java
   trunk/freenet/src/freenet/support/LimitedRangeIntByteArrayMapElement.java
Log:
MessageItem.msg may be null. We need to pass around the priority explicitly...

Modified: trunk/freenet/src/freenet/node/FNPPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/FNPPacketMangler.java        2008-01-12 
21:24:29 UTC (rev 17034)
+++ trunk/freenet/src/freenet/node/FNPPacketMangler.java        2008-01-12 
21:54:21 UTC (rev 17035)
@@ -27,6 +27,7 @@
 import freenet.crypt.UnsupportedCipherException;
 import freenet.crypt.ciphers.Rijndael;
 import freenet.io.comm.AsyncMessageCallback;
+import freenet.io.comm.DMT;
 import freenet.io.comm.FreenetInetAddress;
 import freenet.io.comm.IncomingPacketFilter;
 import freenet.io.comm.Message;
@@ -1982,7 +1983,7 @@
                                                return;
                                        }
                                        int packetNumber = 
kt.allocateOutgoingPacketNumberNeverBlock();
-                                       this.processOutgoingPreformatted(buf, 
0, buf.length, kt, packetNumber, mi.cb, mi.alreadyReportedBytes);
+                                       this.processOutgoingPreformatted(buf, 
0, buf.length, kt, packetNumber, mi.cb, mi.alreadyReportedBytes, 
mi.getPriority());
                                        //MARK: onSent()
                                        mi.onSent(buf.length + 
fullHeadersLengthOneMessage);
                                } catch (NotConnectedException e) {
@@ -2045,6 +2046,7 @@
                AsyncMessageCallback callbacks[] = new 
AsyncMessageCallback[callbacksCount];
                x=0;
                int alreadyReportedBytes = 0;
+               short priority = DMT.PRIORITY_BULK_DATA;
                for(int i=0;i<messages.length;i++) {
                        if(messages[i].formatted) continue;
                        if(messages[i].cb != null) {
@@ -2052,6 +2054,8 @@
                                System.arraycopy(messages[i].cb, 0, callbacks, 
x, messages[i].cb.length);
                                x += messages[i].cb.length;
                        }
+                       short messagePrio = messages[i].getPriority();
+                       if(messagePrio < priority) priority = messagePrio;
                }
                if(x != callbacksCount) throw new IllegalStateException();

@@ -2059,7 +2063,7 @@
                                (messageData.length < 256)) {
                        mi_name = null;
                        try {
-                               innerProcessOutgoing(messageData, 0, 
messageData.length, length, pn, neverWaitForPacketNumber, callbacks, 
alreadyReportedBytes);
+                               innerProcessOutgoing(messageData, 0, 
messageData.length, length, pn, neverWaitForPacketNumber, callbacks, 
alreadyReportedBytes, priority);
                                for(int i=0;i<messageData.length;i++) {
                                        MessageItem mi = newMsgs[i];
                                        mi_name = (mi.msg == null ? "(not a 
Message)" : mi.msg.getSpec().getName());
@@ -2107,7 +2111,8 @@
                                        if(lastIndex != i) {
                                                mi_name = null;
                                                try {
-                                                       
innerProcessOutgoing(messageData, lastIndex, i-lastIndex, length, pn, 
neverWaitForPacketNumber, callbacks, alreadyReportedBytes);
+                                                       // FIXME regenerate 
callbacks and priority!
+                                                       
innerProcessOutgoing(messageData, lastIndex, i-lastIndex, length, pn, 
neverWaitForPacketNumber, callbacks, alreadyReportedBytes, priority);
                                                        for(int 
j=lastIndex;j<i;j++) {
                                                                MessageItem mi 
= newMsgs[j];
                                                                mi_name = 
(mi.msg == null ? "(not a Message)" : mi.msg.getSpec().getName());
@@ -2155,7 +2160,7 @@
         * @throws PacketSequenceException 
         */
        private void innerProcessOutgoing(byte[][] messageData, int start, int 
length, int bufferLength, 
-                       PeerNode pn, boolean neverWaitForPacketNumber, 
AsyncMessageCallback[] callbacks, int alreadyReportedBytes) throws 
NotConnectedException, WouldBlockException, PacketSequenceException {
+                       PeerNode pn, boolean neverWaitForPacketNumber, 
AsyncMessageCallback[] callbacks, int alreadyReportedBytes, short priority) 
throws NotConnectedException, WouldBlockException, PacketSequenceException {
                if(logMINOR) Logger.minor(this, 
"innerProcessOutgoing(...,"+start+ ',' +length+ ',' +bufferLength+ ')');
                byte[] buf = new byte[bufferLength];
                buf[0] = (byte)length;
@@ -2168,15 +2173,15 @@
                        System.arraycopy(data, 0, buf, loc, len);
                        loc += len;
                }
-               processOutgoingPreformatted(buf, 0, loc, pn, 
neverWaitForPacketNumber, callbacks, alreadyReportedBytes);
+               processOutgoingPreformatted(buf, 0, loc, pn, 
neverWaitForPacketNumber, callbacks, alreadyReportedBytes, priority);
        }

        /* (non-Javadoc)
         * @see freenet.node.OutgoingPacketMangler#processOutgoing(byte[], int, 
int, freenet.node.KeyTracker, int)
         */
-       public void processOutgoing(byte[] buf, int offset, int length, 
KeyTracker tracker, int alreadyReportedBytes) throws KeyChangedException, 
NotConnectedException, PacketSequenceException, WouldBlockException {
+       public void processOutgoing(byte[] buf, int offset, int length, 
KeyTracker tracker, int alreadyReportedBytes, short priority) throws 
KeyChangedException, NotConnectedException, PacketSequenceException, 
WouldBlockException {
                byte[] newBuf = preformat(buf, offset, length);
-               processOutgoingPreformatted(newBuf, 0, newBuf.length, tracker, 
-1, null, alreadyReportedBytes);
+               processOutgoingPreformatted(newBuf, 0, newBuf.length, tracker, 
-1, null, alreadyReportedBytes, priority);
        }

        /**
@@ -2184,7 +2189,7 @@
         * the key changes.
         * @throws PacketSequenceException 
         */
-       void processOutgoingPreformatted(byte[] buf, int offset, int length, 
PeerNode peer, boolean neverWaitForPacketNumber, AsyncMessageCallback[] 
callbacks, int alreadyReportedBytes) throws NotConnectedException, 
WouldBlockException, PacketSequenceException {
+       void processOutgoingPreformatted(byte[] buf, int offset, int length, 
PeerNode peer, boolean neverWaitForPacketNumber, AsyncMessageCallback[] 
callbacks, int alreadyReportedBytes, short priority) throws 
NotConnectedException, WouldBlockException, PacketSequenceException {
                KeyTracker last = null;
                while(true) {
                        try {
@@ -2198,7 +2203,7 @@
                                }
                                int seqNo = neverWaitForPacketNumber ? 
tracker.allocateOutgoingPacketNumberNeverBlock() :
                                        tracker.allocateOutgoingPacketNumber();
-                               processOutgoingPreformatted(buf, offset, 
length, tracker, seqNo, callbacks, alreadyReportedBytes);
+                               processOutgoingPreformatted(buf, offset, 
length, tracker, seqNo, callbacks, alreadyReportedBytes, priority);
                                return;
                        } catch (KeyChangedException e) {
                                Logger.normal(this, "Key changed(2) for 
"+peer.getPeer());
@@ -2231,7 +2236,7 @@
        /* (non-Javadoc)
         * @see 
freenet.node.OutgoingPacketMangler#processOutgoingPreformatted(byte[], int, 
int, freenet.node.KeyTracker, int, freenet.node.AsyncMessageCallback[], int)
         */
-       public void processOutgoingPreformatted(byte[] buf, int offset, int 
length, KeyTracker tracker, int packetNumber, AsyncMessageCallback[] callbacks, 
int alreadyReportedBytes) throws KeyChangedException, NotConnectedException, 
PacketSequenceException, WouldBlockException {
+       public void processOutgoingPreformatted(byte[] buf, int offset, int 
length, KeyTracker tracker, int packetNumber, AsyncMessageCallback[] callbacks, 
int alreadyReportedBytes, short priority) throws KeyChangedException, 
NotConnectedException, PacketSequenceException, WouldBlockException {
                if(logMINOR) {
                        String log = 
"processOutgoingPreformatted("+Fields.hashCode(buf)+", "+offset+ ',' +length+ 
',' +tracker+ ',' +packetNumber+ ',';
                        if(callbacks == null) log += "null";
@@ -2436,7 +2441,7 @@
                if(seqNumber != -1) {
                        byte[] saveable = new byte[length];
                        System.arraycopy(buf, offset, saveable, 0, length);
-                       tracker.sentPacket(saveable, seqNumber, callbacks);
+                       tracker.sentPacket(saveable, seqNumber, callbacks, 
priority);
                }

                if(logMINOR) Logger.minor(this, "Sending... "+seqNumber);
@@ -2545,7 +2550,7 @@
        }

        public void resend(ResendPacketItem item) throws 
PacketSequenceException, WouldBlockException, KeyChangedException, 
NotConnectedException {
-               processOutgoingPreformatted(item.buf, 0, item.buf.length, 
item.kt, item.packetNumber, item.callbacks, 0);
+               processOutgoingPreformatted(item.buf, 0, item.buf.length, 
item.kt, item.packetNumber, item.callbacks, 0, item.priority);
        }

        public int[] supportedNegTypes() {

Modified: trunk/freenet/src/freenet/node/KeyTracker.java
===================================================================
--- trunk/freenet/src/freenet/node/KeyTracker.java      2008-01-12 21:24:29 UTC 
(rev 17034)
+++ trunk/freenet/src/freenet/node/KeyTracker.java      2008-01-12 21:54:21 UTC 
(rev 17035)
@@ -10,6 +10,7 @@

 import freenet.crypt.BlockCipher;
 import freenet.io.comm.AsyncMessageCallback;
+import freenet.io.comm.DMT;
 import freenet.io.comm.NotConnectedException;
 import freenet.io.xfer.PacketThrottle;
 import freenet.support.DoublyLinkedList;
@@ -950,14 +951,14 @@
      * @param seqNumber The packet number.
      * @throws NotConnectedException 
      */
-    public void sentPacket(byte[] data, int seqNumber, AsyncMessageCallback[] 
callbacks) throws NotConnectedException {
+    public void sentPacket(byte[] data, int seqNumber, AsyncMessageCallback[] 
callbacks, short priority) throws NotConnectedException {
         if(callbacks != null) {
             for(int i=0;i<callbacks.length;i++) {
                 if(callbacks[i] == null)
                     throw new NullPointerException();
             }
         }
-        sentPacketsContents.add(seqNumber, data, callbacks);
+        sentPacketsContents.add(seqNumber, data, callbacks, priority);
         try {
                        queueAckRequest(seqNumber);
                } catch (UpdatableSortedLinkedListKilledException e) {
@@ -1007,7 +1008,7 @@
             AsyncMessageCallback[] callbacks = element.callbacks;
             // Ignore packet#
             if(logMINOR) Logger.minor(this, "Queueing resend of what was once 
"+element.packetNumber);
-            messages[i] = new MessageItem(buf, callbacks, true, 0, null);
+            messages[i] = new MessageItem(buf, callbacks, true, 0, null, 
element.priority);
         }
         pn.requeueMessageItems(messages, 0, messages.length, true);

@@ -1069,7 +1070,8 @@
                 continue; // acked already?
             }
             AsyncMessageCallback[] callbacks = 
sentPacketsContents.getCallbacks(packetNo);
-            rpiTemp.add(new ResendPacketItem(buf, packetNo, this, callbacks));
+            short priority = sentPacketsContents.getPriority(packetNo, 
DMT.PRIORITY_BULK_DATA);
+            rpiTemp.add(new ResendPacketItem(buf, packetNo, this, callbacks, 
priority));
         }
         if(rpiTemp.isEmpty()) return null;
         return numbers;

Modified: trunk/freenet/src/freenet/node/MessageItem.java
===================================================================
--- trunk/freenet/src/freenet/node/MessageItem.java     2008-01-12 21:24:29 UTC 
(rev 17034)
+++ trunk/freenet/src/freenet/node/MessageItem.java     2008-01-12 21:54:21 UTC 
(rev 17035)
@@ -21,6 +21,7 @@
      */
     final boolean formatted;
     final ByteCounter ctrCallback;
+    private final short priority;

     public MessageItem(Message msg2, AsyncMessageCallback[] cb2, int 
alreadyReportedBytes, ByteCounter ctr) {
        this.alreadyReportedBytes = alreadyReportedBytes;
@@ -30,9 +31,10 @@
         formatted = false;
         this.ctrCallback = ctr;
         this.submitted = System.currentTimeMillis();
+        priority = msg2.getSpec().getPriority();
     }

-    public MessageItem(byte[] data, AsyncMessageCallback[] cb2, boolean 
formatted, int alreadyReportedBytes, ByteCounter ctr) {
+    public MessageItem(byte[] data, AsyncMessageCallback[] cb2, boolean 
formatted, int alreadyReportedBytes, ByteCounter ctr, short priority) {
        this.alreadyReportedBytes = alreadyReportedBytes;
         this.cb = cb2;
         this.msg = null;
@@ -42,6 +44,7 @@
                throw new NullPointerException();
         this.ctrCallback = ctr;
         this.submitted = System.currentTimeMillis();
+        this.priority = priority;
     }

     /**
@@ -74,6 +77,6 @@
        }

        public short getPriority() {
-               return msg.getSpec().getPriority();
+               return priority;
        }
 }

Modified: trunk/freenet/src/freenet/node/OutgoingPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/OutgoingPacketMangler.java   2008-01-12 
21:24:29 UTC (rev 17034)
+++ trunk/freenet/src/freenet/node/OutgoingPacketMangler.java   2008-01-12 
21:54:21 UTC (rev 17035)
@@ -40,7 +40,7 @@
         * @throws WouldBlockException 
         */
        public void processOutgoing(byte[] buf, int offset, int length,
-                       KeyTracker tracker, int alreadyReportedBytes)
+                       KeyTracker tracker, int alreadyReportedBytes, short 
priority)
                        throws KeyChangedException, NotConnectedException,
                        PacketSequenceException, WouldBlockException;

@@ -62,7 +62,7 @@
         */
        public void processOutgoingPreformatted(byte[] buf, int offset, int 
length,
                        KeyTracker tracker, int packetNumber,
-                       AsyncMessageCallback[] callbacks, int 
alreadyReportedBytes)
+                       AsyncMessageCallback[] callbacks, int 
alreadyReportedBytes, short priority)
                        throws KeyChangedException, NotConnectedException,
                        PacketSequenceException, WouldBlockException;


Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java        2008-01-12 21:24:29 UTC 
(rev 17034)
+++ trunk/freenet/src/freenet/node/PeerNode.java        2008-01-12 21:54:21 UTC 
(rev 17035)
@@ -2213,7 +2213,7 @@
                        long t = tracker.getNextUrgentTime();
                        if(t < now || forceSendPrimary) {
                                try {
-                                       outgoingMangler.processOutgoing(null, 
0, 0, tracker, 0);
+                                       outgoingMangler.processOutgoing(null, 
0, 0, tracker, 0, DMT.PRIORITY_NOW);
                                } catch(NotConnectedException e) {
                                // Ignore
                                } catch(KeyChangedException e) {
@@ -2228,7 +2228,7 @@
                        long t = tracker.getNextUrgentTime();
                        if(t < now)
                                try {
-                                       outgoingMangler.processOutgoing(null, 
0, 0, tracker, 0);
+                                       outgoingMangler.processOutgoing(null, 
0, 0, tracker, 0, DMT.PRIORITY_NOW);
                                } catch(NotConnectedException e) {
                                // Ignore
                                } catch(KeyChangedException e) {
@@ -2412,7 +2412,7 @@
                                Logger.error(this, "No tracker to resend packet 
" + item.packetNumber + " on");
                                continue;
                        }
-                       MessageItem mi = new MessageItem(item.buf, 
item.callbacks, true, 0, null);
+                       MessageItem mi = new MessageItem(item.buf, 
item.callbacks, true, 0, null, item.priority);
                        requeueMessageItems(new MessageItem[]{mi}, 0, 1, true);
                }
        }

Modified: trunk/freenet/src/freenet/node/ResendPacketItem.java
===================================================================
--- trunk/freenet/src/freenet/node/ResendPacketItem.java        2008-01-12 
21:24:29 UTC (rev 17034)
+++ trunk/freenet/src/freenet/node/ResendPacketItem.java        2008-01-12 
21:54:21 UTC (rev 17035)
@@ -10,16 +10,18 @@
  * message as byte[].
  */
 public class ResendPacketItem {
-    public ResendPacketItem(byte[] payload, int packetNumber, KeyTracker k, 
AsyncMessageCallback[] callbacks) {
+    public ResendPacketItem(byte[] payload, int packetNumber, KeyTracker k, 
AsyncMessageCallback[] callbacks, short priority) {
         pn = k.pn;
         kt = k;
         buf = payload;
         this.packetNumber = packetNumber;
         this.callbacks = callbacks;
+        this.priority = priority;
     }
     final PeerNode pn;
     final KeyTracker kt;
     final byte[] buf;
     final int packetNumber;
-    final AsyncMessageCallback[] callbacks;        
+    final AsyncMessageCallback[] callbacks;
+    final short priority;
 }
\ No newline at end of file

Modified: trunk/freenet/src/freenet/support/LimitedRangeIntByteArrayMap.java
===================================================================
--- trunk/freenet/src/freenet/support/LimitedRangeIntByteArrayMap.java  
2008-01-12 21:24:29 UTC (rev 17034)
+++ trunk/freenet/src/freenet/support/LimitedRangeIntByteArrayMap.java  
2008-01-12 21:54:21 UTC (rev 17035)
@@ -66,6 +66,14 @@
         else return -1;
     }

+       public short getPriority(int index, short defaultValue) {
+        Integer i = new Integer(index);
+        LimitedRangeIntByteArrayMapElement wrapper = 
(LimitedRangeIntByteArrayMapElement) contents.get(i);
+        if(wrapper != null)
+            return wrapper.priority;
+        else return defaultValue;
+       }
+       
     /**
      * Get the time at which an index was re-added last.
      */
@@ -82,7 +90,7 @@
      * @return True if we succeeded, false if the index was out
      * of range.
      */
-    public synchronized boolean add(int index, byte[] data, 
AsyncMessageCallback[] callbacks) {
+    public synchronized boolean add(int index, byte[] data, 
AsyncMessageCallback[] callbacks, short priority) {
        logMINOR = Logger.shouldLog(Logger.MINOR, this);
        if(logMINOR) Logger.minor(this, toString()+" add "+index);
         if(maxValue == -1) {
@@ -103,7 +111,7 @@
         Integer i = new Integer(index);
         LimitedRangeIntByteArrayMapElement le = 
(LimitedRangeIntByteArrayMapElement) contents.get(i);
         if(le == null)
-               contents.put(new Integer(index), new 
LimitedRangeIntByteArrayMapElement(index, data, callbacks));
+               contents.put(new Integer(index), new 
LimitedRangeIntByteArrayMapElement(index, data, callbacks, priority));
         else
                le.reput();
         notifyAll();

Modified: 
trunk/freenet/src/freenet/support/LimitedRangeIntByteArrayMapElement.java
===================================================================
--- trunk/freenet/src/freenet/support/LimitedRangeIntByteArrayMapElement.java   
2008-01-12 21:24:29 UTC (rev 17034)
+++ trunk/freenet/src/freenet/support/LimitedRangeIntByteArrayMapElement.java   
2008-01-12 21:54:21 UTC (rev 17035)
@@ -5,10 +5,11 @@

 public class LimitedRangeIntByteArrayMapElement {

-    public LimitedRangeIntByteArrayMapElement(int packetNumber, byte[] data2, 
AsyncMessageCallback[] callbacks2) {
+    public LimitedRangeIntByteArrayMapElement(int packetNumber, byte[] data2, 
AsyncMessageCallback[] callbacks2, short priority) {
         this.packetNumber = packetNumber;
         this.data = data2;
         this.callbacks = callbacks2;
+        this.priority = priority;
         createdTime = System.currentTimeMillis();
     }

@@ -16,6 +17,7 @@
     public final byte[] data;
     public final AsyncMessageCallback[] callbacks;
     public final long createdTime;
+    public final short priority;
     long reputTime;

        public void reput() {


Reply via email to