Author: toad
Date: 2008-08-13 12:14:42 +0000 (Wed, 13 Aug 2008)
New Revision: 21799

Modified:
   branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java
Log:
Don't finish by size, finish when none started and none not started.
Add to blocksStarted when start.
Remove blocks from blocksFinished in finish() when handle them.
Logging. Timeout after 20 minutes from selection: if not finished, log an error.
Log an error if finish twice.


Modified: 
branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java 
2008-08-13 12:13:04 UTC (rev 21798)
+++ branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java 
2008-08-13 12:14:42 UTC (rev 21799)
@@ -39,13 +39,13 @@
        public transient final boolean localRequestOnly;
        public transient final boolean cacheLocalRequests;
        public transient final boolean ignoreStore;
-       public transient final int size;
        public transient final ArrayList<PersistentChosenBlock> 
blocksNotStarted;
        public transient final ArrayList<PersistentChosenBlock> blocksStarted;
        public transient final ArrayList<PersistentChosenBlock> blocksFinished;
        public final RequestScheduler scheduler;
        public final SendableRequestSender sender;
        private boolean logMINOR;
+       private boolean finished;

        PersistentChosenRequest(SendableRequest req, short prio, int 
retryCount, ObjectContainer container, RequestScheduler sched, ClientContext 
context) {
                request = req;
@@ -76,7 +76,19 @@
                sender = req.getSender(container, context);
                if(!reqActive)
                        container.deactivate(req, 1);
-               size = blocksNotStarted.size();
+               logMINOR = Logger.shouldLog(Logger.MINOR, this);
+               
+               // Remove debugging check? How reasonable is the timeout for 
inserts??
+               context.ticker.queueTimedJob(new Runnable() {
+
+                       public void run() {
+                               synchronized(PersistentChosenRequest.this) {
+                                       if(finished) return;
+                                       Logger.error(this, "Still not finished 
after timeout: "+this);
+                               }
+                       }
+                       
+               }, 20*60*1000);
        }

        void onFinished(PersistentChosenBlock block, ClientContext context) {
@@ -104,9 +116,9 @@
                                        return;
                                }
                        blocksFinished.add(block);
-                       if(blocksFinished.size() < size) {
+                       if(!(blocksNotStarted.isEmpty() && 
blocksStarted.isEmpty())) {
                                if(logMINOR)
-                                       Logger.minor(this, "Blocks finished: 
"+blocksFinished.size()+" of "+size+" on "+this+" for "+request);
+                                       Logger.minor(this, "Not finishing yet: 
blocks not started: "+blocksNotStarted.size()+" started: 
"+blocksStarted.size()+" finished: "+blocksFinished.size());
                                return;
                        }
                }
@@ -114,19 +126,41 @@
                context.jobRunner.queue(new DBJob() {

                        public void run(ObjectContainer container, 
ClientContext context) {
-                               finish(container, context);
+                               finish(container, context, false);
                        }

                }, NativeThread.NORM_PRIORITY + 1, false);
        }

-       private void finish(ObjectContainer container, ClientContext context) {
-               Logger.error(this, "Finishing "+this+" for "+request);
+       private void finish(ObjectContainer container, ClientContext context, 
boolean dumping) {
+               container.activate(request, 1);
+               Logger.normal(this, "Finishing "+this+" for "+request);
                // Call all the callbacks.
+               PersistentChosenBlock[] finishedBlocks;
+               synchronized(this) {
+                       if(finished) {
+                               if(blocksFinished.isEmpty()) {
+                                       // Okay...
+                                       return;
+                               } else {
+                                       Logger.error(this, "Finished but 
blocksFinished not empty on "+this, new Exception("debug"));
+                                       // Process the blocks...
+                               }
+                       }
+                       finished = true;
+                       finishedBlocks = blocksFinished.toArray(new 
PersistentChosenBlock[blocksFinished.size()]);
+               }
+               if(finishedBlocks.length == 0) {
+                       if(!dumping)
+                               Logger.error(this, "No finished blocks in 
finish() on "+this);
+                       else if(logMINOR)
+                               Logger.minor(this, "No finished blocks in 
finish() on "+this);
+                       return;
+               }
                if(request instanceof SendableGet) {
                        boolean supportsBulk = request instanceof 
SupportsBulkCallFailure;
                        Vector<BulkCallFailureItem> bulkFailItems = null;
-                       for(PersistentChosenBlock block : blocksFinished) {
+                       for(PersistentChosenBlock block : finishedBlocks) {
                                if(!block.fetchSucceeded()) {
                                        LowLevelGetException e = 
block.failedGet();
                                        if(supportsBulk) {
@@ -146,7 +180,7 @@
                                container.commit(); // db4o is read-committed, 
so we need to commit here.
                        }
                } else /*if(request instanceof SendableInsert)*/ {
-                       for(PersistentChosenBlock block : blocksFinished) {
+                       for(PersistentChosenBlock block : finishedBlocks) {
                                container.activate(request, 1);
                                if(block.insertSucceeded()) {
                                        
((SendableInsert)request).onSuccess(block.token, container, context);
@@ -163,8 +197,11 @@
        public synchronized ChosenBlock grabNotStarted(Random random) {
                int size = blocksNotStarted.size();
                if(size == 0) return null;
-               if(size == 1) return blocksNotStarted.remove(0);
-               return blocksNotStarted.remove(random.nextInt(size));
+               PersistentChosenBlock ret;
+               if(size == 1) ret = blocksNotStarted.remove(0);
+               else ret = blocksNotStarted.remove(random.nextInt(size));
+               blocksStarted.add(ret);
+               return ret;
        }

        public synchronized int sizeNotStarted() {
@@ -184,7 +221,7 @@
                }
                if(!wasStarted) {
                        if(logMINOR) Logger.minor(this, "Finishing immediately 
in onDumped() as nothing pending: "+this);
-                       finish(container, core.sched.clientContext);
+                       finish(container, core.sched.clientContext, true);
                }
        }
 }


Reply via email to