Author: nextgens
Date: 2006-07-09 22:11:57 +0000 (Sun, 09 Jul 2006)
New Revision: 9529

Modified:
   trunk/freenet/src/freenet/node/CHKInsertSender.java
Log:
Fixes a bug in CHKInsertSender and possibly locking issues

Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertSender.java 2006-07-09 21:51:04 UTC 
(rev 9528)
+++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2006-07-09 22:11:57 UTC 
(rev 9529)
@@ -52,17 +52,17 @@
                /** Have we received notice of the downstream success
                 * or failure of dependant transfers from that node?
                 * Includes timing out. */
-               boolean receivedCompletionNotice = false;
+               boolean receivedCompletionNotice;
                /** Timed out - didn't receive completion notice in
                 * the allotted time?? */
-               boolean completionTimedOut = false;
+               boolean completionTimedOut;
                /** Was the notification of successful transfer? */
                boolean completionSucceeded;

                /** Have we completed the immediate transfer? */
-               boolean completedTransfer = false;
+               boolean completedTransfer;
                /** Did it succeed? */
-               boolean transferSucceeded = false;
+               boolean transferSucceeded;

                AwaitingCompletion(PeerNode pn, PartiallyReceivedBlock prb) {
                        this.pn = pn;
@@ -151,7 +151,7 @@
     final byte[] headers; // received BEFORE creation => we handle Accepted 
elsewhere
     final PartiallyReceivedBlock prb;
     final boolean fromStore;
-    private boolean receiveFailed = false;
+    private boolean receiveFailed;
     final double closestLocation;
     final long startTime;
     private boolean sentRequest;
@@ -161,13 +161,13 @@
     private Vector nodesWaitingForCompletion;

     /** Have all transfers completed and all nodes reported completion status? 
*/
-    private boolean allTransfersCompleted = false;
+    private boolean allTransfersCompleted;

     /** Has a transfer timed out, either directly or downstream? */
-    private boolean transferTimedOut = false;
+    private boolean transferTimedOut;

     /** Runnable which waits for completion of all transfers */
-    private CompletionWaiter cw = null;
+    private CompletionWaiter cw;

     /** Time at which we set status to a value other than NOT_FINISHED */
     private long setStatusTime = -1;
@@ -193,7 +193,7 @@
         return super.toString()+" for "+uid;
     }

-    public void run() {
+    public synchronized void run() {
         short origHTL = htl;
         node.addInsertSender(myKey, htl, this);
         try {
@@ -215,12 +215,13 @@
         while(true) {
             if(receiveFailed) return; // don't need to set status as killed by 
InsertHandler

-            if(htl == 0) {
-                // Send an InsertReply back
-                finish(SUCCESS, null);
-                return;
+            synchronized (this) {
+               if(htl == 0) {
+                       // Send an InsertReply back
+                       finish(SUCCESS, null);
+                       return;
+               }
             }
-            
             // Route it
             PeerNode next;
             // Can backtrack, so only route to nodes closer than we are to 
target.
@@ -239,13 +240,15 @@
             Logger.minor(this, "Routing insert to "+next);
             nodesRoutedTo.add(next);

-            if(PeerManager.distance(target, nextValue) > 
PeerManager.distance(target, closestLocation)) {
-                Logger.minor(this, "Backtracking: target="+target+" 
next="+nextValue+" closest="+closestLocation);
-                htl = node.decrementHTL(source, htl);
+            Message req;
+            synchronized (this) {
+               if(PeerManager.distance(target, nextValue) > 
PeerManager.distance(target, closestLocation)) {
+                       Logger.minor(this, "Backtracking: target="+target+" 
next="+nextValue+" closest="+closestLocation);
+                       htl = node.decrementHTL(source, htl);
+               }
+
+               req = DMT.createFNPInsertRequest(uid, htl, myKey, 
closestLocation);
             }
-            
-            Message req = DMT.createFNPInsertRequest(uid, htl, myKey, 
closestLocation);
-            
             // Wait for ack or reject... will come before even a locally 
generated DataReply

             MessageFilter mfAccepted = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPAccepted);
@@ -266,7 +269,9 @@
                                Logger.minor(this, "Not connected to "+next);
                                continue;
                        }
-            sentRequest = true;
+                       synchronized (this) {
+                               sentRequest = true;                             
+                       }

             if(receiveFailed) return; // don't need to set status as killed by 
InsertHandler
             Message msg = null;
@@ -277,7 +282,7 @@
              * followed by an Accepted. So we must loop here.
              */

-            while (true) {
+            while (msg==null || (msg.getSpec() != DMT.FNPAccepted)) {

                                try {
                                        msg = node.usm.waitFor(mf, this);
@@ -326,7 +331,6 @@
                                        break;
                                }
                                // Otherwise is an FNPAccepted
-                               break;
                        }

             if(msg == null || msg.getSpec() != DMT.FNPAccepted) continue;
@@ -421,8 +425,10 @@
                                if (msg.getSpec() == DMT.FNPRouteNotFound) {
                                        Logger.minor(this, "Rejected: RNF");
                                        short newHtl = msg.getShort(DMT.HTL);
-                                       if (htl > newHtl)
-                                               htl = newHtl;
+                                       synchronized (this) {
+                                               if (htl > newHtl)
+                                                       htl = newHtl;           
                                
+                                       }
                                        // Finished as far as this node is 
concerned
                                        next.successNotOverload();
                                        break;
@@ -486,17 +492,18 @@
                                if (msg.getSpec() != DMT.FNPInsertReply) {
                                        Logger.error(this, "Unknown reply: " + 
msg);
                                        finish(INTERNAL_ERROR, next);
+                                       return;
+                               }else{
+                                       // Our task is complete
+                                       next.successNotOverload();
+                                       finish(SUCCESS, next);
+                                       return;
                                }
-                               
-                               // Our task is complete
-                               next.successNotOverload();
-                               finish(SUCCESS, next);
-                               return;
                        }
                }
        }

-       private boolean hasForwardedRejectedOverload = false;
+       private boolean hasForwardedRejectedOverload;

     synchronized boolean receivedRejectedOverload() {
        return hasForwardedRejectedOverload;
@@ -513,11 +520,11 @@

     private void finish(int code, PeerNode next) {
         Logger.minor(this, "Finished: "+code+" on "+this, new 
Exception("debug"));
-        if(status != NOT_FINISHED)
-               throw new IllegalStateException("finish() called with "+code+" 
when was already "+status);
-
+       
         synchronized(this) {
-        
+                if(status != NOT_FINISHED)
+               throw new IllegalStateException("finish() called with "+code+" 
when was already "+status);
+               
                setStatusTime = System.currentTimeMillis();

                if(code == ROUTE_NOT_FOUND && !sentRequest)
@@ -559,11 +566,11 @@
         Logger.minor(this, "Returning from finish()");
     }

-    public int getStatus() {
+    public synchronized int getStatus() {
         return status;
     }

-    public short getHTL() {
+    public synchronized short getHTL() {
         return htl;
     }

@@ -578,7 +585,7 @@
     /**
      * @return The current status as a string
      */
-    public String getStatusString() {
+    public synchronized String getStatusString() {
         if(status == SUCCESS)
             return "SUCCESS";
         if(status == ROUTE_NOT_FOUND)
@@ -596,18 +603,19 @@
         return "UNKNOWN STATUS CODE: "+status;
     }

-       public boolean sentRequest() {
+       public synchronized boolean sentRequest() {
                return sentRequest;
        }

        private void makeCompletionWaiter() {
+               Thread t;
                synchronized(this) {
                        if(cw == null)
                                cw = new CompletionWaiter();
                        else
                                return;
+                       t = new Thread(cw, "Completion waiter for "+uid);
                }
-               Thread t = new Thread(cw, "Completion waiter for "+uid);
                t.setDaemon(true);
                t.start();
        }
@@ -813,7 +821,7 @@
                }
        }

-       public boolean completed() {
+       public synchronized boolean completed() {
                return allTransfersCompleted;
        }



Reply via email to