Author: toad
Date: 2009-02-03 19:12:22 +0000 (Tue, 03 Feb 2009)
New Revision: 25507

Modified:
   branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
   
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
   branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java
   branches/db4o/freenet/src/freenet/node/KeysFetchingLocally.java
   branches/db4o/freenet/src/freenet/node/RequestScheduler.java
   branches/db4o/freenet/src/freenet/node/RequestStarter.java
Log:
Prevent the same transient insert from being selected more than once.


Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2009-02-03 18:21:42 UTC (rev 25506)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2009-02-03 19:12:22 UTC (rev 25507)
@@ -45,7 +45,7 @@
 public class ClientRequestScheduler implements RequestScheduler {
        
        private final ClientRequestSchedulerCore schedCore;
-       private final ClientRequestSchedulerNonPersistent schedTransient;
+       final ClientRequestSchedulerNonPersistent schedTransient;
        
        private static boolean logMINOR;
        
@@ -1028,6 +1028,10 @@
        public void removeFetchingKey(Key key) {
                schedCore.removeFetchingKey(key);
        }
+
+       public void removeTransientInsertFetching(SendableInsert insert, Object 
token) {
+               schedCore.removeTransientInsertFetching(insert, token);
+       }
        
        /**
         * Map from SendableGet implementing SupportsBulkCallFailure to 
BulkCallFailureItem[].
@@ -1086,6 +1090,10 @@
                return schedCore.addToFetching(key);
        }
        
+       public boolean addTransientInsertFetching(SendableInsert insert, Object 
token) {
+               return schedCore.addTransientInsertFetching(insert, token);
+       }
+       
        public boolean hasFetchingKey(Key key) {
                return schedCore.hasKey(key);
        }

Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java  
    2009-02-03 18:21:42 UTC (rev 25506)
+++ 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java  
    2009-02-03 19:12:22 UTC (rev 25507)
@@ -20,6 +20,7 @@
 import freenet.node.Node;
 import freenet.node.RequestStarter;
 import freenet.node.SendableGet;
+import freenet.node.SendableInsert;
 import freenet.node.SendableRequest;
 import freenet.support.Logger;
 import freenet.support.PrioritizedSerialExecutor;
@@ -54,6 +55,30 @@
         */
        private transient HashSet keysFetching;
        
+       private class RunningTransientInsert {
+               
+               final SendableInsert insert;
+               final Object token;
+               
+               RunningTransientInsert(SendableInsert i, Object t) {
+                       insert = i;
+                       token = t;
+               }
+               
+               public int hashCode() {
+                       return insert.hashCode() ^ token.hashCode();
+               }
+               
+               public boolean equals(Object o) {
+                       if(!(o instanceof RunningTransientInsert)) return false;
+                       RunningTransientInsert r = (RunningTransientInsert) o;
+                       return r.insert == insert && (r.token == token || 
r.token.equals(token));
+               }
+               
+       }
+       
+       private transient HashSet<RunningTransientInsert> 
runningTransientInserts;
+       
        public final byte[] globalSalt;
        
        /**
@@ -114,10 +139,13 @@
                this.sched = sched;
                this.initTime = System.currentTimeMillis();
                // We DO NOT want to rerun the query after consuming the 
initial set...
-               if(!isInsertScheduler)
+               if(!isInsertScheduler) {
                        keysFetching = new HashSet();
-               else
+                       runningTransientInserts = null;
+               } else {
                        keysFetching = null;
+                       runningTransientInserts = new 
HashSet<RunningTransientInsert>();
+               }
                if(isInsertScheduler) {
                preRegisterMeRunner = new DBJob() {
 
@@ -716,5 +744,35 @@
                return ret + cooldown;
        }
 
+       public boolean hasTransientInsert(SendableInsert insert, Object token) {
+               RunningTransientInsert tmp = new RunningTransientInsert(insert, 
token);
+               synchronized(runningTransientInserts) {
+                       return runningTransientInserts.contains(tmp);
+               }
+       }
+
+       public boolean addTransientInsertFetching(SendableInsert insert, Object 
token) {
+               RunningTransientInsert tmp = new RunningTransientInsert(insert, 
token);
+               synchronized(runningTransientInserts) {
+                       boolean retval = runningTransientInserts.add(tmp);
+                       if(!retval) {
+                               Logger.normal(this, "Already in 
runningTransientInserts: "+insert+" : "+token);
+                       } else {
+                               if(logMINOR)
+                                       Logger.minor(this, "Added to 
runningTransientInserts: "+insert+" : "+token);
+                       }
+                       return retval;
+               }
+       }
+       
+       public void removeTransientInsertFetching(SendableInsert insert, Object 
token) {
+               RunningTransientInsert tmp = new RunningTransientInsert(insert, 
token);
+               if(logMINOR)
+                       Logger.minor(this, "Removing from 
runningTransientInserts: "+insert+" : "+token);
+               synchronized(runningTransientInserts) {
+                       runningTransientInserts.remove(tmp);
+               }
+       }
+
 }
 

Modified: 
branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java     
2009-02-03 18:21:42 UTC (rev 25506)
+++ branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java     
2009-02-03 19:12:22 UTC (rev 25507)
@@ -464,7 +464,12 @@
        public synchronized Object chooseKey(KeysFetchingLocally ignored, 
ObjectContainer container, ClientContext context) {
                if(finished) return null;
                // Ignore KeysFetchingLocally, it's for requests.
-               return getBlock(container, context, false);
+               Object ret = getBlock(container, context, false);
+               if(!persistent) {
+                       if(ignored.hasTransientInsert(this, ret))
+                               return null;
+               }
+               return ret;
        }
 
        @Override

Modified: branches/db4o/freenet/src/freenet/node/KeysFetchingLocally.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/KeysFetchingLocally.java     
2009-02-03 18:21:42 UTC (rev 25506)
+++ branches/db4o/freenet/src/freenet/node/KeysFetchingLocally.java     
2009-02-03 19:12:22 UTC (rev 25507)
@@ -9,5 +9,13 @@
         * LOCKING: This should be safe just about anywhere, the lock 
protecting it is always taken last.
         */
        public boolean hasKey(Key key);
+       
+       /**
+        * Is this request:token pair being executed? This applies only to
+        * non-persistent inserts, because persistent requests are selected on
+        * a request level, and requests use hasKey(). Also, activation issues
+        * with SendableRequest meaning getting a hash code would be 
problematic!
+        */
+       public boolean hasTransientInsert(SendableInsert insert, Object token);
 
 }

Modified: branches/db4o/freenet/src/freenet/node/RequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestScheduler.java        
2009-02-03 18:21:42 UTC (rev 25506)
+++ branches/db4o/freenet/src/freenet/node/RequestScheduler.java        
2009-02-03 19:12:22 UTC (rev 25507)
@@ -79,4 +79,8 @@
 
        public void start(NodeClientCore core);
 
+       public boolean addTransientInsertFetching(SendableInsert insert, Object 
token);
+
+       public void removeTransientInsertFetching(SendableInsert insert, Object 
token);
+
 }

Modified: branches/db4o/freenet/src/freenet/node/RequestStarter.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestStarter.java  2009-02-03 
18:21:42 UTC (rev 25506)
+++ branches/db4o/freenet/src/freenet/node/RequestStarter.java  2009-02-03 
19:12:22 UTC (rev 25507)
@@ -7,6 +7,7 @@
 
 import freenet.client.async.ChosenBlock;
 import freenet.client.async.ClientContext;
+import freenet.client.async.TransientChosenBlock;
 import freenet.keys.Key;
 import freenet.support.Logger;
 import freenet.support.OOMHandler;
@@ -185,6 +186,9 @@
                if(req.key != null) {
                        if(!sched.addToFetching(req.key))
                                return false;
+               } else if((!req.isPersistent()) && 
((TransientChosenBlock)req).request instanceof SendableInsert) {
+                       
if(!sched.addTransientInsertFetching((SendableInsert)(((TransientChosenBlock)req).request),
 req.token))
+                               return false;
                }
                if(logMINOR) Logger.minor(this, "Running request "+req+" 
priority "+req.getPriority());
                core.getExecutor().execute(new SenderThread(req, req.key), 
"RequestStarter$SenderThread for "+req);
@@ -230,6 +234,9 @@
                                Logger.minor(this, "Finished "+req);
                        } finally {
                                if(key != null) sched.removeFetchingKey(key);
+                               else if((!req.isPersistent()) && 
((TransientChosenBlock)req).request instanceof SendableInsert)
+                                       
sched.removeTransientInsertFetching((SendableInsert)(((TransientChosenBlock)req).request),
 req.token);
+
                        }
                }
                

_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to