Author: toad
Date: 2008-10-23 17:41:39 +0000 (Thu, 23 Oct 2008)
New Revision: 23050

Added:
   
branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegmentRegisterJob.java
Modified:
   branches/db4o/freenet/src/freenet/client/async/SplitFileInserter.java
Log:
Memory usage on inserts:
- Register each segment on a separate database job. Queue them in RAM but also 
on the restart jobs queue, and remove them from the latter as they are executed.
- Deactivation. Avoid having all the segments activated at once.


Modified: branches/db4o/freenet/src/freenet/client/async/SplitFileInserter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileInserter.java       
2008-10-23 17:36:14 UTC (rev 23049)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileInserter.java       
2008-10-23 17:41:39 UTC (rev 23050)
@@ -4,6 +4,7 @@
 package freenet.client.async;

 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Vector;

 import com.db4o.ObjectContainer;
@@ -22,6 +23,7 @@
 import freenet.support.api.Bucket;
 import freenet.support.compress.Compressor;
 import freenet.support.io.BucketTools;
+import freenet.support.io.NativeThread;

 public class SplitFileInserter implements ClientPutState {

@@ -271,7 +273,8 @@
                for(int i=0;i<segments.length;i++) {
                        if(persistent)
                                container.activate(segments[i], 1);
-                       segments[i].start(container, context);
+                       SplitFileInserterSegmentRegisterJob segJob = new 
SplitFileInserterSegmentRegisterJob(segments[i], NativeThread.NORM_PRIORITY);
+                       segJob.schedule(container, context, 
NativeThread.NORM_PRIORITY);
                        if(persistent)
                                container.deactivate(segments[i], 1);
                }
@@ -297,18 +300,16 @@
                                }
                                if((segments[i] == null) || 
!segments[i].isEncoded()) {
                                        ret = true;
+                                       if(segments[i] != segment && persistent)
+                                               
container.deactivate(segments[i], 1);
                                        break;
                                }
-                       }
-               }
-               if(encode) segment.forceEncode(container, context);
-               if(ret) {
-                       for(int i=0;i<segments.length;i++) {
-                               if(segments[i] != segment)
+                               if(segments[i] != segment && persistent)
                                        container.deactivate(segments[i], 1);
                        }
-                       return;
                }
+               if(encode) segment.forceEncode(container, context);
+               if(ret) return;
                if(persistent)
                        container.activate(cb, 1);
                cb.onBlockSetFinished(this, container, context);
@@ -331,14 +332,11 @@
                        for(int i=0;i<segments.length;i++) {
                                if(persistent)
                                        container.activate(segments[i], 1);
-                               if(!segments[i].hasURIs()) {
+                               boolean hasURIs = segments[i].hasURIs();
+                               if(persistent && segments[i] != segment)
+                                       container.deactivate(segments[i], 1);
+                               if(!hasURIs) {
                                        if(logMINOR) Logger.minor(this, 
"Segment does not have URIs: "+segments[i]);
-                                       if(persistent) {
-                                               for(int j=0;j<i;j++) {
-                                                       if(segments[j] == 
segment) continue;
-                                                       
container.deactivate(segments[j], 1);
-                                               }
-                                       }
                                        return;
                                }
                        }
@@ -351,12 +349,24 @@
        private void encodeMetadata(ObjectContainer container, ClientContext 
context, SplitFileInserterSegment dontDeactivateSegment) {
                boolean missingURIs;
                Metadata m = null;
-               if(persistent)
-                       activateSegments(container, dontDeactivateSegment);
+               ClientCHK[] dataURIs = new ClientCHK[countDataBlocks];
+               ClientCHK[] checkURIs = new ClientCHK[countCheckBlocks];
                synchronized(this) {
+                       int dpos = 0;
+                       int cpos = 0;
+                       for(int i=0;i<segments.length;i++) {
+                               if(persistent)
+                                       container.activate(segments[i], 1);
+                               ClientCHK[] data = segments[i].getDataCHKs();
+                               System.arraycopy(data, 0, dataURIs, dpos, 
data.length);
+                               dpos += data.length;
+                               ClientCHK[] check = segments[i].getCheckCHKs();
+                               System.arraycopy(check, 0, checkURIs, cpos, 
check.length);
+                               cpos += check.length;
+                               if(persistent && segments[i] != 
dontDeactivateSegment)
+                                       container.deactivate(segments[i], 1);
+                       }
                        // Create metadata
-                       ClientCHK[] dataURIs = getDataCHKs(container);
-                       ClientCHK[] checkURIs = getCheckCHKs(container);

                        if(logMINOR) Logger.minor(this, "Data URIs: 
"+dataURIs.length+", check URIs: "+checkURIs.length);

@@ -368,8 +378,6 @@
                        }
                        haveSentMetadata = true;
                }
-               if(persistent)
-                       deactivateSegments(container, dontDeactivateSegment);
                if(missingURIs) {
                        if(logMINOR) Logger.minor(this, "Missing URIs");
                        // Error
@@ -384,20 +392,6 @@
                }
        }

-       private void activateSegments(ObjectContainer container, 
SplitFileInserterSegment notMe) {
-               for(int i=0;i<segments.length;i++) {
-                       if(segments[i] == notMe) continue;
-                       container.activate(segments[i], 1);
-               }
-       }
-
-       private void deactivateSegments(ObjectContainer container, 
SplitFileInserterSegment notMe) {
-               for(int i=0;i<segments.length;i++) {
-                       if(segments[i] == notMe) continue;
-                       container.deactivate(segments[i], 1);
-               }
-       }
-
        private void fail(InsertException e, ObjectContainer container, 
ClientContext context) {
                synchronized(this) {
                        if(finished) return;
@@ -420,64 +414,6 @@
                return false;
        }

-       /**
-        * Segments MUST BE ACTIVATED TO DEPTH 1 BEFORE CALLING,
-        * and should be deactivated by caller.
-        * @param container
-        * @return
-        */
-       private ClientCHK[] getCheckCHKs(ObjectContainer container) {
-               // Copy check blocks from each segment into a FreenetURI[].
-               ClientCHK[] uris = new ClientCHK[countCheckBlocks];
-               int x = 0;
-               for(int i=0;i<segments.length;i++) {
-                       ClientCHK[] segURIs = segments[i].getCheckCHKs();
-                       if(x + segURIs.length > countCheckBlocks)
-                               throw new IllegalStateException("x="+x+", 
segURIs="+segURIs.length+", countCheckBlocks="+countCheckBlocks);
-                       System.arraycopy(segURIs, 0, uris, x, segURIs.length);
-                       x += segURIs.length;
-               }
-
-               if(persistent) {
-                       for(int i=0;i<uris.length;i++)
-                               container.activate(uris[i], 5);
-               }
-               
-               if(uris.length != x)
-                       throw new IllegalStateException("Total is wrong");
-               
-               return uris;
-       }
-
-       /**
-        * Segments MUST BE ACTIVATED TO DEPTH 1 BEFORE CALLING,
-        * and should be deactivated by caller.
-        * @param container
-        * @return
-        */
-       private ClientCHK[] getDataCHKs(ObjectContainer container) {
-               // Copy check blocks from each segment into a FreenetURI[].
-               ClientCHK[] uris = new ClientCHK[countDataBlocks];
-               int x = 0;
-               for(int i=0;i<segments.length;i++) {
-                       ClientCHK[] segURIs = segments[i].getDataCHKs();
-                       if(x + segURIs.length > countDataBlocks) 
-                               throw new IllegalStateException("x="+x+", 
segURIs="+segURIs.length+", countDataBlocks="+countDataBlocks);
-                       System.arraycopy(segURIs, 0, uris, x, segURIs.length);
-                       x += segURIs.length;
-               }
-
-               if(persistent) {
-                       for(int i=0;i<uris.length;i++)
-                               container.activate(uris[i], 5);
-               }
-               
-               if(uris.length != x)
-                       throw new IllegalStateException("Total is wrong");
-               
-               return uris;
-       }
-
        public BaseClientPutter getParent() {
                return parent;
        }

Added: 
branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegmentRegisterJob.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegmentRegisterJob.java
                             (rev 0)
+++ 
branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegmentRegisterJob.java
     2008-10-23 17:41:39 UTC (rev 23050)
@@ -0,0 +1,36 @@
+package freenet.client.async;
+
+import com.db4o.ObjectContainer;
+
+import freenet.client.InsertException;
+
+public class SplitFileInserterSegmentRegisterJob implements DBJob {
+
+       final SplitFileInserterSegment seg;
+       
+       final int restartPriority;
+       
+       public SplitFileInserterSegmentRegisterJob(SplitFileInserterSegment 
segment, int restartPriority) {
+               seg = segment;
+               this.restartPriority = restartPriority;
+       }
+
+       public void run(ObjectContainer container, ClientContext context) {
+               container.activate(seg, 1);
+               try {
+                       seg.start(container, context);
+               } catch (InsertException e) {
+                       container.activate(seg.parent, 1);
+                       seg.finish(e, container, context, seg.parent);
+                       container.deactivate(seg.parent, 1);
+               }
+               container.deactivate(seg, 1);
+               context.jobRunner.removeRestartJob(this, restartPriority, 
container);
+       }
+
+       public void schedule(ObjectContainer container, ClientContext context, 
int nowPriority) {
+               context.jobRunner.queueRestartJob(this, restartPriority, 
container);
+               context.jobRunner.queue(this, nowPriority, false);
+       }
+
+}


Reply via email to