Author: j16sdiz
Date: 2008-12-18 04:26:12 +0000 (Thu, 18 Dec 2008)
New Revision: 24503

Modified:
   trunk/plugins/XMLSpider/XMLSpider.java
Log:
run make-index and page-callback at the same (single) thread
this reduce thread contention and actually faster

Modified: trunk/plugins/XMLSpider/XMLSpider.java
===================================================================
--- trunk/plugins/XMLSpider/XMLSpider.java      2008-12-18 00:57:53 UTC (rev 
24502)
+++ trunk/plugins/XMLSpider/XMLSpider.java      2008-12-18 04:26:12 UTC (rev 
24503)
@@ -16,6 +16,7 @@
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -26,7 +27,7 @@
 import java.util.Set;
 import java.util.Vector;
 import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -68,7 +69,6 @@
 import freenet.keys.FreenetURI;
 import freenet.keys.USK;
 import freenet.node.NodeClientCore;
-import freenet.node.PrioRunnable;
 import freenet.node.RequestStarter;
 import freenet.pluginmanager.FredPlugin;
 import freenet.pluginmanager.FredPluginHTTP;
@@ -80,7 +80,6 @@
 import freenet.support.Logger;
 import freenet.support.api.Bucket;
 import freenet.support.api.HTTPRequest;
-import freenet.support.io.NativeThread;
 import freenet.support.io.NullBucketFactory;
 
 /**
@@ -354,11 +353,7 @@
                }
 
                public void onSuccess(final FetchResult result, final 
ClientGetter state) {
-                       callbackExecutor.execute(new Runnable() {
-                               public void run() {
-                                       XMLSpider.this.onSuccess(result, state, 
page);
-                               }
-                       });
+                       callbackExecutor.execute(new OnSucessCallback(result, 
state, page));
                }
 
                public void onSuccess(BaseClientPutter state) {
@@ -377,11 +372,51 @@
                return getter;
        }
 
+
+       protected class OnSucessCallback implements Runnable {
+               private FetchResult result;
+               private ClientGetter state;
+               private Page page;
+
+               OnSucessCallback(FetchResult result, ClientGetter state, Page 
page) {
+                       this.result = result;
+                       this.state = state;
+                       this.page = page;
+               }
+
+               public void run() {
+                       onSuccess(result, state, page);
+               }
+       }
+
+       protected class MakeIndexCallback implements Runnable {
+               public void run() {
+                       try {
+                               makeIndex();
+                       } catch (Exception e) {
+                               Logger.error(this, "Could not generate index: 
"+e, e);
+                       }
+               }
+       }
+
+       protected class CallbackPrioritizer implements Comparator<Runnable> {
+               public int compare(Runnable o1, Runnable o2) {
+                       if (o1.getClass() == o2.getClass())
+                               return 0;
+
+                       // MakeIndexCallback always have higher priority
+                       if (o1 instanceof MakeIndexCallback) 
+                               return -1;
+                       if (o2 instanceof MakeIndexCallback)
+                               return -1;
+                       return 0;
+               }
+       }
+
        // this is java.util.concurrent.Executor, not freenet.support.Executor
-       // allow limiting the number of thread running
+       // always run with one thread --> more thread cause contention and 
slower!
        protected Executor callbackExecutor = new ThreadPoolExecutor( //
-               0, Runtime.getRuntime().availableProcessors(), 10 * 60, 
TimeUnit.SECONDS, //
-               new LinkedBlockingQueue<Runnable>());
+               1, 1, 600, TimeUnit.SECONDS, new 
PriorityBlockingQueue<Runnable>(5, new CallbackPrioritizer()));
        
        /**
         * Processes the successfully fetched uri for further outlinks.
@@ -390,16 +425,9 @@
         * @param state
         * @param page
         */
+       // single threaded
        protected void onSuccess(FetchResult result, ClientGetter state, Page 
page) {
                synchronized (this) {
-                       while ((writingIndex || writeIndexScheduled) && 
!stopped) {
-                               try {
-                                       wait();
-                               } catch (InterruptedException e) {
-                                       return;
-                               }
-                       }
-
                        if (stopped)
                                return;                                 
                }
@@ -1372,13 +1400,6 @@
                }
 
                try {
-                       synchronized(this) {
-                               if(!mustWriteIndex) {
-                                       Logger.minor(this, "Not making index, 
no data added since last time");
-                                       return;
-                               }
-                               mustWriteIndex = false;
-                       }
                        time_taken = System.currentTimeMillis();
 
                        makeSubIndices();
@@ -1403,19 +1424,7 @@
                if (writeIndexScheduled || writingIndex)
                        return;
                
-               core.getTicker().queueTimedJob(new PrioRunnable() {
-                       public void run() {
-                               try {
-                                       makeIndex();
-                               } catch (Exception e) {
-                                       Logger.error(this, "Could not generate 
index: "+e, e);
-                               }
-                       }
-
-                       public int getPriority() {
-                               return NativeThread.LOW_PRIORITY;
-                       }
-               }, 60 * 1000); // wait 1 minute for cool down
+               callbackExecutor.execute(new MakeIndexCallback());
                writeIndexScheduled = true;
        }
 

_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to