Author: toad
Date: 2009-02-04 18:37:30 +0000 (Wed, 04 Feb 2009)
New Revision: 25547
Modified:
branches/db4o/freenet/src/freenet/client/async/SplitFileInserter.java
branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegment.java
Log:
Segment-ise inserts. MAJOR UNTESTED CHANGE. This should significantly reduce
the amount of database work needed for splitfile inserts, especially for
starting them.
Modified: branches/db4o/freenet/src/freenet/client/async/SplitFileInserter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileInserter.java
2009-02-04 18:36:32 UTC (rev 25546)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileInserter.java
2009-02-04 18:37:30 UTC (rev 25547)
@@ -18,6 +18,7 @@
import freenet.client.Metadata;
import freenet.keys.CHKBlock;
import freenet.keys.ClientCHK;
+import freenet.node.PrioRunnable;
import freenet.support.Executor;
import freenet.support.Logger;
import freenet.support.SimpleFieldSet;
@@ -75,9 +76,9 @@
fs.put("SegmentSize", segmentSize);
fs.put("CheckSegmentSize", checkSegmentSize);
SimpleFieldSet segs = new SimpleFieldSet(false);
- for(int i=0;i<segments.length;i++) {
- segs.put(Integer.toString(i),
segments[i].getProgressFieldset());
- }
+// for(int i=0;i<segments.length;i++) {
+// segs.put(Integer.toString(i),
segments[i].getProgressFieldset());
+// }
segs.put("Count", segments.length);
fs.put("Segments", segs);
return fs;
@@ -283,7 +284,7 @@
return (SplitFileInserterSegment[]) segs.toArray(new
SplitFileInserterSegment[segs.size()]);
}
- public void start(ObjectContainer container, ClientContext context)
throws InsertException {
+ public void start(ObjectContainer container, final ClientContext
context) throws InsertException {
for(int i=0;i<segments.length;i++) {
if(persistent) {
container.activate(segments[i], 1);
@@ -291,7 +292,26 @@
segJob.schedule(container, context,
NativeThread.NORM_PRIORITY-1, persistent);
container.deactivate(segments[i], 1);
} else {
- segments[i].start(container, context);
+ if(!getCHKOnly)
+ segments[i].start(container, context);
+ else {
+ final SplitFileInserterSegment seg =
segments[i];
+ context.mainExecutor.execute(new
PrioRunnable() {
+
+ public int getPriority() {
+ return
NativeThread.NORM_PRIORITY;
+ }
+
+ public void run() {
+ try {
+ seg.start(null,
context);
+ } catch
(InsertException e) {
+ fail(e, null,
context);
+ }
+ }
+
+ }, "Schedule segment (get chk only)");
+ }
}
}
if(persistent)
Modified:
branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegment.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegment.java
2009-02-04 18:36:32 UTC (rev 25546)
+++
branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegment.java
2009-02-04 18:37:30 UTC (rev 25547)
@@ -1,36 +1,50 @@
package freenet.client.async;
+import java.io.IOException;
import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import com.db4o.ObjectContainer;
import freenet.client.FECCallback;
import freenet.client.FECCodec;
import freenet.client.FECJob;
-import freenet.client.FECQueue;
import freenet.client.FailureCodeTracker;
import freenet.client.InsertContext;
import freenet.client.InsertException;
import freenet.client.Metadata;
import freenet.client.SplitfileBlock;
-import freenet.keys.BaseClientKey;
import freenet.keys.CHKBlock;
+import freenet.keys.CHKEncodeException;
import freenet.keys.ClientCHK;
+import freenet.keys.ClientCHKBlock;
import freenet.keys.ClientKey;
import freenet.keys.FreenetURI;
+import freenet.node.KeysFetchingLocally;
+import freenet.node.LowLevelPutException;
+import freenet.node.NodeClientCore;
+import freenet.node.RequestClient;
+import freenet.node.RequestScheduler;
+import freenet.node.SendableInsert;
+import freenet.node.SendableRequestItem;
+import freenet.node.SendableRequestSender;
import freenet.support.Fields;
import freenet.support.Logger;
import freenet.support.SimpleFieldSet;
import freenet.support.api.Bucket;
+import freenet.support.io.BucketTools;
import freenet.support.io.CannotCreateFromFieldSetException;
-import freenet.support.io.SerializableToFieldSetBucket;
+import freenet.support.io.NativeThread;
import freenet.support.io.SerializableToFieldSetBucketUtil;
-public class SplitFileInserterSegment implements PutCompletionCallback,
FECCallback {
+public class SplitFileInserterSegment extends SendableInsert implements
FECCallback, Encodeable {
private static volatile boolean logMINOR;
final SplitFileInserter parent;
+ final BaseClientPutter putter;
final short splitfileAlgo;
@@ -42,10 +56,23 @@
final ClientCHK[] checkURIs;
- final SingleBlockInserter[] dataBlockInserters;
-
- final SingleBlockInserter[] checkBlockInserters;
-
+ final int[] dataRetries;
+ final int[] checkRetries;
+
+ final int[] dataConsecutiveRNFs;
+ final int[] checkConsecutiveRNFs;
+
+ /** Block numbers not finished */
+ final ArrayList<Integer> blocks;
+
+ final boolean[] dataFinished;
+ final boolean[] checkFinished;
+
+ final boolean[] dataFailed;
+ final boolean[] checkFailed;
+
+ final int maxRetries;
+
final InsertContext blockInsertContext;
final int segNo;
@@ -65,25 +92,17 @@
private final FailureCodeTracker errors;
private int blocksGotURI;
-
+ private int blocksSucceeded;
private int blocksCompleted;
private final boolean persistent;
- // A persistent hashCode is helpful in debugging, and also means we can
put
- // these objects into sets etc when we need to.
-
- private final int hashCode;
-
- public int hashCode() {
- return hashCode;
- }
public SplitFileInserterSegment(SplitFileInserter parent, boolean
persistent, BaseClientPutter putter,
short splitfileAlgo, int checkBlockCount, Bucket[]
origDataBlocks,
InsertContext blockInsertContext, boolean getCHKOnly,
int segNo, ObjectContainer container) {
+ super(persistent);
logMINOR = Logger.shouldLog(Logger.MINOR, this);
- hashCode = super.hashCode();
this.parent = parent;
this.getCHKOnly = getCHKOnly;
this.persistent = persistent;
@@ -94,11 +113,22 @@
checkBlocks = new Bucket[checkBlockCount];
checkURIs = new ClientCHK[checkBlockCount];
dataURIs = new ClientCHK[origDataBlocks.length];
- dataBlockInserters = new SingleBlockInserter[dataBlocks.length];
- checkBlockInserters = new
SingleBlockInserter[checkBlocks.length];
+ dataRetries = new int[origDataBlocks.length];
+ checkRetries = new int[checkBlockCount];
+ dataFinished = new boolean[origDataBlocks.length];
+ checkFinished = new boolean[checkBlockCount];
+ dataFailed = new boolean[origDataBlocks.length];
+ checkFailed = new boolean[checkBlockCount];
+ dataConsecutiveRNFs = new int[origDataBlocks.length];
+ checkConsecutiveRNFs = new int[checkBlockCount];
+ blocks = new ArrayList<Integer>();
putter.addBlocks(dataURIs.length + checkURIs.length, container);
putter.addMustSucceedBlocks(dataURIs.length + checkURIs.length,
container);
this.segNo = segNo;
+ if(persistent) container.activate(blockInsertContext, 1);
+ maxRetries = blockInsertContext.maxInsertRetries;
+ this.putter = putter;
+
}
/**
@@ -109,12 +139,13 @@
public SplitFileInserterSegment(SplitFileInserter parent, boolean
persistent, BaseClientPutter putter,
SimpleFieldSet fs, short splitfileAlgorithm,
InsertContext ctx,
boolean getCHKOnly, int segNo, ClientContext context,
ObjectContainer container) throws ResumeException {
- hashCode = super.hashCode();
+ super(persistent);
this.parent = parent;
this.splitfileAlgo = splitfileAlgorithm;
this.getCHKOnly = getCHKOnly;
this.persistent = persistent;
this.blockInsertContext = ctx;
+ this.maxRetries = ctx.maxInsertRetries;
this.segNo = segNo;
if (!"SplitFileInserterSegment".equals(fs.get("Type")))
throw new ResumeException("Wrong Type: " +
fs.get("Type"));
@@ -148,7 +179,12 @@
dataBlocks = new Bucket[dataBlockCount];
dataURIs = new ClientCHK[dataBlockCount];
- dataBlockInserters = new SingleBlockInserter[dataBlockCount];
+ dataRetries = new int[dataBlockCount];
+ dataConsecutiveRNFs = new int[dataBlockCount];
+ dataFinished = new boolean[dataBlockCount];
+ dataFailed = new boolean[dataBlockCount];
+ blocks = new ArrayList<Integer>();
+ this.putter = putter;
// Check blocks first, because if there are missing check
blocks, we
// need
@@ -169,7 +205,10 @@
}
checkBlocks = new Bucket[checkBlockCount];
checkURIs = new ClientCHK[checkBlockCount];
- checkBlockInserters = new
SingleBlockInserter[checkBlockCount];
+ checkRetries = new int[checkBlockCount];
+ checkConsecutiveRNFs = new int[checkBlockCount];
+ checkFinished = new boolean[checkBlockCount];
+ checkFailed = new boolean[checkBlockCount];
for (int i = 0; i < checkBlockCount; i++) {
String index = Integer.toString(i);
SimpleFieldSet blockFS = checkFS.subset(index);
@@ -252,7 +291,10 @@
int checkBlocksCount =splitfileAlgo.countCheckBlocks();
this.checkURIs = new ClientCHK[checkBlocksCount];
this.checkBlocks = new Bucket[checkBlocksCount];
- this.checkBlockInserters = new
SingleBlockInserter[checkBlocksCount];
+ checkRetries = new int[checkBlocksCount];
+ checkConsecutiveRNFs = new int[checkBlocksCount];
+ checkFinished = new boolean[checkBlocksCount];
+ checkFailed = new boolean[checkBlocksCount];
hasURIs = false;
}
@@ -328,87 +370,6 @@
putter.addMustSucceedBlocks(dataURIs.length + checkURIs.length,
container);
}
- public synchronized SimpleFieldSet getProgressFieldset() {
- SimpleFieldSet fs = new SimpleFieldSet(false); // these get BIG
- fs.putSingle("Type", "SplitFileInserterSegment");
- fs.put("Finished", finished);
- // If true, check blocks which are null are finished
- fs.put("Encoded", encoded);
- // If true, data blocks which are null are finished
- fs.put("Started", started);
- fs.tput("Errors", errors.toFieldSet(false));
- SimpleFieldSet dataFS = new SimpleFieldSet(false);
- dataFS.put("Count", dataBlocks.length);
- for (int i = 0; i < dataBlocks.length; i++) {
- SimpleFieldSet block = new SimpleFieldSet(false);
- if (dataURIs[i] != null)
- block.putSingle("URI",
dataURIs[i].getURI().toString());
- SingleBlockInserter sbi = dataBlockInserters[i];
- // If started, then sbi = null => block finished.
- boolean finished = started && sbi == null;
- if (started) {
- block.put("Finished", finished);
- }
- Bucket data = dataBlocks[i];
- if (data == null && finished) {
- // Ignore
- if (logMINOR)
- Logger.minor(this, "Could not save to
disk bucket: null");
- } else if (data instanceof
SerializableToFieldSetBucket) {
- SimpleFieldSet tmp =
((SerializableToFieldSetBucket) data).toFieldSet();
- if (tmp == null) {
- if (logMINOR)
- Logger.minor(this, "Could not
save to disk data: " + data);
- return null;
- }
- block.put("Data", tmp);
- } else {
- if (logMINOR)
- Logger.minor(this,
- "Could not save to disk
(not serializable to fieldset): " + data);
- return null;
- }
- if (!block.isEmpty())
- dataFS.put(Integer.toString(i), block);
- }
- fs.put("DataBlocks", dataFS);
- SimpleFieldSet checkFS = new SimpleFieldSet(false);
- checkFS.put("Count", checkBlocks.length);
- for (int i = 0; i < checkBlocks.length; i++) {
- SimpleFieldSet block = new SimpleFieldSet(false);
- if (checkURIs[i] != null)
- block.putSingle("URI",
checkURIs[i].getURI().toString());
- SingleBlockInserter sbi = checkBlockInserters[i];
- // If encoded, then sbi == null => block finished
- boolean finished = encoded && sbi == null &&
checkURIs[i] != null;
- if (encoded) {
- block.put("Finished", finished);
- }
- if (!finished) {
- Bucket data = checkBlocks[i];
- if (data != null
- && data instanceof
SerializableToFieldSetBucket) {
- SimpleFieldSet tmp =
((SerializableToFieldSetBucket) data)
- .toFieldSet();
- if (tmp == null)
- Logger.error(this, "Could not
serialize " + data
- + " - check
block " + i + " of " + segNo);
- else
- block.put("Data", tmp);
- } else if (encoded) {
- Logger.error(this,
- "Could not save to disk
(null or not serializable to fieldset) encoded="+encoded+" finished="+finished
+ " checkURI[i]="+checkURIs[i]+" : "
- + data,
new Exception());
- return null;
- }
- }
- if (!block.isEmpty())
- checkFS.put(Integer.toString(i), block);
- }
- fs.put("CheckBlocks", checkFS);
- return fs;
- }
-
public void start(ObjectContainer container, ClientContext context)
throws InsertException {
// Always called by parent, so don't activate or deactivate
parent.
if(persistent) {
@@ -424,16 +385,12 @@
}
boolean fin = true;
- for (int i = 0; i < dataBlockInserters.length; i++) {
+ for (int i = 0; i < dataBlocks.length; i++) {
if (dataBlocks[i] != null) { // else already finished
on creation
- dataBlockInserters[i] = new
SingleBlockInserter(parent.parent,
- dataBlocks[i], (short) -1,
FreenetURI.EMPTY_CHK_URI,
- blockInsertContext, this,
false, CHKBlock.DATA_LENGTH,
- i, getCHKOnly, false, false,
parent.token, container, context, persistent, false);
- dataBlockInserters[i].schedule(container,
context);
- if(persistent)
-
container.deactivate(dataBlockInserters[i], 1);
fin = false;
+ synchronized(this) {
+ blocks.add(i);
+ }
} else {
parent.parent.completedBlock(true, container,
context);
}
@@ -464,16 +421,11 @@
}
fin = false;
} else {
- for (int i = 0; i < checkBlockInserters.length; i++) {
+ for (int i = 0; i < checkBlocks.length; i++) {
if (checkBlocks[i] != null) {
- checkBlockInserters[i] = new
SingleBlockInserter(
- parent.parent,
checkBlocks[i], (short) -1,
-
FreenetURI.EMPTY_CHK_URI, blockInsertContext, this,
- false,
CHKBlock.DATA_LENGTH, i + dataBlocks.length,
- getCHKOnly, false,
false, parent.token, container, context, persistent, false);
-
checkBlockInserters[i].schedule(container, context);
- if(persistent)
-
container.deactivate(checkBlockInserters[i], 1);
+ synchronized(this) {
+ blocks.add(i +
dataBlocks.length);
+ }
fin = false;
} else
parent.parent.completedBlock(true,
container, context);
@@ -493,6 +445,8 @@
parent.segmentFetchable(this, container);
if (fin)
finish(container, context, parent);
+ else
+ schedule(container, context);
if (finished) {
parent.segmentFinished(this, container, context);
}
@@ -501,6 +455,47 @@
}
}
+ private void schedule(ObjectContainer container, ClientContext context)
{
+ if(!getCHKOnly) {
+ this.getScheduler(context).registerInsert(this,
persistent, false, container);
+ } else {
+ tryEncode(container, context);
+ }
+ }
+
+ public void tryEncode(ObjectContainer container, ClientContext context)
{
+ for(int i=0;i<dataBlocks.length;i++) {
+ if(dataURIs[i] == null && dataBlocks[i] != null) {
+ try {
+ ClientCHK key = (ClientCHK)
encodeBucket(dataBlocks[i]).getClientKey();
+ onEncode(i, key, container, context);
+ } catch (CHKEncodeException e) {
+ fail(new
InsertException(InsertException.INTERNAL_ERROR, e, null), container, context);
+ } catch (IOException e) {
+ fail(new
InsertException(InsertException.BUCKET_ERROR, e, null), container, context);
+ }
+ } else if(dataURIs[i] == null && dataBlocks[i] == null)
{
+ fail(new
InsertException(InsertException.INTERNAL_ERROR, "Data block "+i+" cannot be
encoded: no data", null), container, context);
+ }
+ }
+ if(encoded) {
+ for(int i=0;i<checkBlocks.length;i++) {
+ if(checkURIs[i] == null && checkBlocks[i] !=
null) {
+ try {
+ ClientCHK key = (ClientCHK)
encodeBucket(checkBlocks[i]).getClientKey();
+ onEncode(i, key, container,
context);
+ } catch (CHKEncodeException e) {
+ fail(new
InsertException(InsertException.INTERNAL_ERROR, e, null), container, context);
+ } catch (IOException e) {
+ fail(new
InsertException(InsertException.BUCKET_ERROR, e, null), container, context);
+ }
+ } else if(checkURIs[i] == null &&
checkBlocks[i] == null) {
+ fail(new
InsertException(InsertException.INTERNAL_ERROR, "Data block "+i+" cannot be
encoded: no data", null), container, context);
+ }
+ }
+ }
+ }
+
public void onDecodedSegment(ObjectContainer container, ClientContext
context, FECJob job, Bucket[] dataBuckets, Bucket[] checkBuckets,
SplitfileBlock[] dataBlockStatus, SplitfileBlock[] checkBlockStatus) {} //
irrevelant
public void onEncodedSegment(ObjectContainer container, ClientContext
context, FECJob job, Bucket[] dataBuckets, Bucket[] checkBuckets,
SplitfileBlock[] dataBlockStatus, SplitfileBlock[] checkBlockStatus) {
@@ -534,8 +529,8 @@
// Start the inserts
try {
if(logMINOR)
- Logger.minor(this, "Scheduling
"+checkBlockInserters.length+" check blocks...");
- for (int i = 0; i < checkBlockInserters.length; i++) {
+ Logger.minor(this, "Scheduling
"+checkBlocks.length+" check blocks...");
+ for (int i = 0; i < checkBlocks.length; i++) {
// See comments on FECCallback: WE MUST COPY
THE DATA BACK!!!
checkBlocks[i] = checkBuckets[i];
if(checkBlocks[i] == null) {
@@ -545,18 +540,15 @@
}
if(persistent)
checkBlocks[i].storeTo(container);
- if(checkBlockInserters[i] != null) continue;
- checkBlockInserters[i] = new
SingleBlockInserter(parent.parent,
- checkBlocks[i], (short) -1,
FreenetURI.EMPTY_CHK_URI,
- blockInsertContext, this,
false, CHKBlock.DATA_LENGTH,
- i + dataBlocks.length,
getCHKOnly, false, false,
- parent.token, container,
context, persistent, false);
- checkBlockInserters[i].schedule(container,
context);
if(persistent) {
-
container.deactivate(checkBlockInserters[i], 1);
container.deactivate(checkBlocks[i], 1);
}
}
+ synchronized(this) {
+ for(int i=0;i<checkBlocks.length;i++)
+ blocks.add(dataBlocks.length + i);
+ }
+ schedule(container, context);
} catch (Throwable t) {
Logger.error(this, "Caught " + t + " while encoding " +
this, t);
InsertException ex = new InsertException(
@@ -579,8 +571,8 @@
parent.encodedSegment(this, container, context);
synchronized (this) {
- for (int i = 0; i < dataBlockInserters.length; i++) {
- if (dataBlockInserters[i] == null &&
dataBlocks[i] != null) {
+ for (int i = 0; i < dataBlocks.length; i++) {
+ if (dataFinished[i] && dataBlocks[i] != null) {
if(persistent)
container.activate(dataBlocks[i], 1);
dataBlocks[i].free();
if(persistent)
@@ -640,11 +632,8 @@
parent.segmentFinished(this, container, context);
}
- public void onEncode(BaseClientKey k, ClientPutState state,
ObjectContainer container, ClientContext context) {
- ClientCHK key = (ClientCHK) k;
- SingleBlockInserter sbi = (SingleBlockInserter) state;
- int x = sbi.token;
- if(logMINOR) Logger.minor(this, "Encoded block "+x+" on
"+this+" : "+sbi);
+ private void onEncode(int x, ClientCHK key, ObjectContainer container,
ClientContext context) {
+ if(logMINOR) Logger.minor(this, "Encoded block "+x+" on "+this);
synchronized (this) {
if (finished)
return;
@@ -690,113 +679,6 @@
container.deactivate(parent, 1);
}
- public void onSuccess(ClientPutState state, ObjectContainer container,
ClientContext context) {
- if(persistent) {
- container.activate(parent, 1);
- container.activate(parent.parent, 1);
- }
- if (parent.parent.isCancelled()) {
- parent.cancel(container, context);
- return;
- }
- SingleBlockInserter sbi = (SingleBlockInserter) state;
- int x = sbi.token;
- completed(x, container, context);
- if(persistent) {
- container.deactivate(parent.parent, 1);
- container.deactivate(parent, 1);
- }
- }
-
- public void onFailure(InsertException e, ClientPutState state,
ObjectContainer container, ClientContext context) {
- if(persistent) {
- container.activate(parent, 1);
- container.activate(parent.parent, 1);
- container.activate(errors, 1);
- }
- if (parent.parent.isCancelled()) {
- parent.cancel(container, context);
- return;
- }
- SingleBlockInserter sbi = (SingleBlockInserter) state;
- int x = sbi.token;
- errors.merge(e);
- completed(x, container, context);
- if(persistent) {
- container.deactivate(parent.parent, 1);
- container.deactivate(parent, 1);
- container.deactivate(errors, 1);
- }
- }
-
- private void completed(int x, ObjectContainer container, ClientContext
context) {
- int total = innerCompleted(x, container);
- if (total == -1)
- return;
- if (total == dataBlockInserters.length) {
- if(persistent)
- container.activate(parent, 1);
- parent.segmentFetchable(this, container);
- }
- if (total != dataBlockInserters.length +
checkBlockInserters.length)
- return;
- if(persistent)
- container.store(this);
- finish(container, context, parent);
- }
-
- /**
- * Called when a block has completed.
- *
- * @param x
- * The block number.
- * @return -1 if the segment has already finished, otherwise the number
of
- * completed blocks.
- */
- private synchronized int innerCompleted(int x, ObjectContainer
container) {
- if (logMINOR)
- Logger.minor(this, "Completed: " + x + " on " + this
- + " ( completed=" + blocksCompleted +
", total="
- + (dataBlockInserters.length +
checkBlockInserters.length));
-
- if (finished)
- return -1;
- if (x >= dataBlocks.length) {
- x -= dataBlocks.length;
- if (checkBlockInserters[x] == null) {
- Logger.error(this, "Completed twice: check
block " + x + " on "
- + this, new Exception());
- return blocksCompleted;
- }
- checkBlockInserters[x] = null;
- if(persistent)
- container.activate(checkBlocks[x], 1);
- checkBlocks[x].free();
- if(persistent)
- checkBlocks[x].removeFrom(container);
- checkBlocks[x] = null;
- } else {
- if (dataBlockInserters[x] == null) {
- Logger.error(this, "Completed twice: data block
" + x + " on "
- + this, new Exception());
- return blocksCompleted;
- }
- dataBlockInserters[x] = null;
- if (encoded) {
- if(persistent)
- container.activate(dataBlocks[x], 1);
- dataBlocks[x].free();
- if(persistent)
- dataBlocks[x].removeFrom(container);
- dataBlocks[x] = null;
- }
- }
- blocksCompleted++;
- if(persistent)
- container.store(this);
- return blocksCompleted;
- }
-
public synchronized boolean isFinished() {
return finished;
}
@@ -839,37 +721,7 @@
}
private void cancelInner(ObjectContainer container, ClientContext
context) {
- for (int i = 0; i < dataBlockInserters.length; i++) {
- SingleBlockInserter sbi = dataBlockInserters[i];
- if(persistent)
- container.activate(sbi, 1);
- if (sbi != null)
- sbi.cancel(container, context);
- Bucket d = dataBlocks[i];
- if (d != null) {
- if(persistent)
- container.activate(d, 5);
- d.free();
- if(persistent)
- d.removeFrom(container);
- dataBlocks[i] = null;
- }
- }
- for (int i = 0; i < checkBlockInserters.length; i++) {
- SingleBlockInserter sbi = checkBlockInserters[i];
- if(persistent)
- container.activate(sbi, 1);
- if (sbi != null)
- sbi.cancel(container, context);
- Bucket d = checkBlocks[i];
- if (d != null) {
- if(persistent)
- container.activate(d, 5);
- d.free();
- d.removeFrom(container);
- checkBlocks[i] = null;
- }
- }
+ super.unregister(container, context);
if(persistent) {
container.store(this);
container.activate(parent, 1);
@@ -909,10 +761,22 @@
* encoded ASAP.
*/
public void forceEncode(ObjectContainer container, ClientContext
context) {
- context.backgroundBlockEncoder.queue(dataBlockInserters,
container, context);
- context.backgroundBlockEncoder.queue(checkBlockInserters,
container, context);
+ context.backgroundBlockEncoder.queue(this, container, context);
}
+ public void fail(InsertException e, ObjectContainer container,
ClientContext context) {
+ synchronized(this) {
+ if(finished) {
+ Logger.error(this, "Failing but already
finished on "+this);
+ return;
+ }
+ finished = true;
+ Logger.error(this, "Insert segment failed: "+e+" for
"+this, e);
+ this.toThrow = e;
+ }
+ cancelInner(container, context);
+ }
+
public void onFailed(Throwable t, ObjectContainer container,
ClientContext context) {
synchronized(this) {
if(finished) {
@@ -925,4 +789,563 @@
}
cancelInner(container, context);
}
+
+ Bucket getBucket(int blockNum) {
+ if(blockNum > dataBlocks.length)
+ return checkBlocks[blockNum - dataBlocks.length];
+ else
+ return dataBlocks[blockNum];
+ }
+
+ private BlockItem getBlockItem(ObjectContainer container, ClientContext
context, int blockNum) throws IOException {
+ Bucket sourceData = getBucket(blockNum);
+ boolean deactivateBucket = false;
+ if(persistent) {
+ deactivateBucket =
!container.ext().isActive(sourceData);
+ if(deactivateBucket)
+ container.activate(sourceData, 1);
+ }
+ Bucket data = sourceData.createShadow();
+ if(data == null) {
+ data =
context.tempBucketFactory.makeBucket(sourceData.size());
+ BucketTools.copy(sourceData, data);
+ }
+ if(persistent) {
+ if(deactivateBucket)
+ container.deactivate(sourceData, 1);
+ }
+ return new BlockItem(this, blockNum, data, persistent);
+ }
+
+ private int hashCodeForBlock(int blockNum) {
+ // FIXME: Standard hashCode() pattern assumes both inputs are
evenly
+ // distributed ... this is not true here.
+ return hashCode() * (blockNum + 1);
+ }
+
+ private static class BlockItem implements SendableRequestItem {
+
+ private final boolean persistent;
+ private final Bucket copyBucket;
+ private final int hashCode;
+ /** STRICTLY for purposes of equals() !!! */
+ private final SplitFileInserterSegment parent;
+ private final int blockNum;
+
+ BlockItem(SplitFileInserterSegment parent, int blockNum, Bucket
bucket, boolean persistent) throws IOException {
+ this.parent = parent;
+ this.blockNum = blockNum;
+ this.copyBucket = bucket;
+ this.hashCode = parent.hashCodeForBlock(blockNum);
+ this.persistent = persistent;
+ }
+
+ public void dump() {
+ copyBucket.free();
+ }
+
+ public int hashCode() {
+ return hashCode;
+ }
+
+ public boolean equals(Object o) {
+ if(o instanceof BlockItem) {
+ if(((BlockItem)o).parent == parent &&
((BlockItem)o).blockNum == blockNum) return true;
+ } else if(o instanceof FakeBlockItem) {
+ if(((FakeBlockItem)o).getParent() == parent &&
((FakeBlockItem)o).blockNum == blockNum) return true;
+ }
+ return false;
+ }
+
+ }
+
+ // Used for testing whether a block is already queued.
+ private class FakeBlockItem implements SendableRequestItem {
+
+ private final int blockNum;
+ private final int hashCode;
+
+ FakeBlockItem(int blockNum) {
+ this.blockNum = blockNum;
+ this.hashCode = hashCodeForBlock(blockNum);
+
+ }
+
+ public void dump() {
+ // Do nothing
+ }
+
+ public SplitFileInserterSegment getParent() {
+ return SplitFileInserterSegment.this;
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if(o instanceof BlockItem) {
+ if(((BlockItem)o).parent ==
SplitFileInserterSegment.this && ((BlockItem)o).blockNum == blockNum) return
true;
+ } else if(o instanceof FakeBlockItem) {
+ if(((FakeBlockItem)o).getParent() ==
SplitFileInserterSegment.this && ((FakeBlockItem)o).blockNum == blockNum)
return true;
+ }
+ return false;
+ }
+ }
+
+ @Override
+ public void onFailure(LowLevelPutException e, Object keyNum,
ObjectContainer container, ClientContext context) {
+ BlockItem block = (BlockItem) keyNum;
+ // First report the error.
+ if(persistent)
+ container.activate(errors, 5);
+ switch(e.code) {
+ case LowLevelPutException.COLLISION:
+ Logger.error(this, "Collision on a CHK?!?!?");
+ fail(new
InsertException(InsertException.INTERNAL_ERROR, "Collision on a CHK", null),
container, context);
+ return;
+ case LowLevelPutException.INTERNAL_ERROR:
+ errors.inc(InsertException.INTERNAL_ERROR);
+ break;
+ case LowLevelPutException.REJECTED_OVERLOAD:
+ errors.inc(InsertException.REJECTED_OVERLOAD);
+ break;
+ case LowLevelPutException.ROUTE_NOT_FOUND:
+ errors.inc(InsertException.ROUTE_NOT_FOUND);
+ break;
+ case LowLevelPutException.ROUTE_REALLY_NOT_FOUND:
+ errors.inc(InsertException.ROUTE_REALLY_NOT_FOUND);
+ break;
+ default:
+ Logger.error(this, "Unknown LowLevelPutException code:
"+e.code);
+ errors.inc(InsertException.INTERNAL_ERROR);
+ }
+ if(persistent)
+ container.store(errors);
+ boolean isRNF = e.code == LowLevelPutException.ROUTE_NOT_FOUND
||
+ e.code == LowLevelPutException.ROUTE_REALLY_NOT_FOUND;
+ int blockNum = block.blockNum;
+ boolean treatAsSuccess = false;
+ boolean failedBlock = false;
+ int completed;
+ int succeeded;
+ synchronized(this) {
+ if(blockNum > dataBlocks.length) {
+ // Check block.
+ int checkNum = blockNum = dataBlocks.length;
+ if(checkFinished[checkNum]) {
+ if(checkFailed[checkNum])
+ Logger.error(this, "Got
onFailure() but block has already failed! Check block "+checkNum+" on "+this);
+ else
+ Logger.error(this, "Got
onFailure() but block has already succeeded: Check block "+checkNum+" on
"+this);
+ return;
+ }
+ if(isRNF) {
+ checkConsecutiveRNFs[checkNum]++;
+ if(persistent)
container.activate(blockInsertContext, 1);
+ if(logMINOR) Logger.minor(this,
"Consecutive RNFs: "+checkConsecutiveRNFs[checkNum]+" /
"+blockInsertContext.consecutiveRNFsCountAsSuccess);
+ if(checkConsecutiveRNFs[checkNum] ==
blockInsertContext.consecutiveRNFsCountAsSuccess) {
+ // Treat as success
+ treatAsSuccess = true;
+ }
+ } else {
+ checkConsecutiveRNFs[checkNum] = 0;
+ }
+ if(!treatAsSuccess) {
+ checkRetries[checkNum]++;
+ if(checkRetries[checkNum] > maxRetries
&& maxRetries != -1) {
+ failedBlock = true;
+ // Treat as failed.
+ checkFinished[checkNum] = true;
+ checkFailed[checkNum] = true;
+ blocksCompleted++;
+ if(checkBlocks[checkNum] !=
null) {
+ if(persistent)
container.activate(checkBlocks[checkNum], 1);
+
checkBlocks[checkNum].free();
+
checkBlocks[checkNum].removeFrom(container);
+ checkBlocks[checkNum] =
null;
+ } else {
+ Logger.error(this,
"Check block "+checkNum+" failed on "+this+" but bucket is already nulled
out!");
+ }
+ }
+ // Else we are still registered, but
will have to be
+ // re-selected: for persistent
requests, the current
+ // PersistentChosenRequest will not
re-run the same block.
+ // This is okay!
+ } else {
+ // Better handle it here to minimize
race conditions. :|
+ checkFinished[checkNum] = true;
+ checkFailed[checkNum] = false; //
Treating as succeeded
+ blocksCompleted++;
+ blocksSucceeded++;
+ if(checkBlocks[checkNum] != null) {
+ if(persistent)
container.activate(checkBlocks[checkNum], 1);
+ checkBlocks[checkNum].free();
+
checkBlocks[checkNum].removeFrom(container);
+ checkBlocks[checkNum] = null;
+ } else {
+ Logger.error(this, "Check block
"+checkNum+" succeeded (sort of) on "+this+" but bucket is already nulled
out!");
+ }
+ }
+ } else {
+ // Data block.
+ if(dataFinished[blockNum]) {
+ if(dataFailed[blockNum])
+ Logger.error(this, "Got
onFailure() but block has already failed! Data block "+blockNum+" on "+this);
+ else
+ Logger.error(this, "Got
onFailure() but block has already succeeded: Data block "+blockNum+" on "+this);
+ return;
+ }
+ if(isRNF) {
+ dataConsecutiveRNFs[blockNum]++;
+ if(persistent)
container.activate(blockInsertContext, 1);
+ if(logMINOR) Logger.minor(this,
"Consecutive RNFs: "+dataConsecutiveRNFs[blockNum]+" /
"+blockInsertContext.consecutiveRNFsCountAsSuccess);
+ if(dataConsecutiveRNFs[blockNum] ==
blockInsertContext.consecutiveRNFsCountAsSuccess) {
+ // Treat as success
+ treatAsSuccess = true;
+ }
+ } else {
+ dataConsecutiveRNFs[blockNum] = 0;
+ }
+ if(!treatAsSuccess) {
+ dataRetries[blockNum]++;
+ if(dataRetries[blockNum] > maxRetries
&& maxRetries != -1) {
+ failedBlock = true;
+ // Treat as failed.
+ dataFinished[blockNum] = true;
+ dataFailed[blockNum] = true;
+ blocksCompleted++;
+ if(dataBlocks[blockNum] !=
null) {
+ if(persistent)
container.activate(dataBlocks[blockNum], 1);
+
dataBlocks[blockNum].free();
+
dataBlocks[blockNum].removeFrom(container);
+ dataBlocks[blockNum] =
null;
+ } else {
+ Logger.error(this,
"Data block "+blockNum+" failed on "+this+" but bucket is already nulled out!");
+ }
+ }
+ // Else we are still registered, but
will have to be
+ // re-selected: for persistent
requests, the current
+ // PersistentChosenRequest will not
re-run the same block.
+ // This is okay!
+ } else {
+ // Better handle it here to minimize
race conditions. :|
+ dataFinished[blockNum] = true;
+ dataFailed[blockNum] = false; //
Treating as succeeded
+ blocksCompleted++;
+ blocksSucceeded++;
+ if(dataBlocks[blockNum] != null &&
encoded) {
+ if(persistent)
container.activate(dataBlocks[blockNum], 1);
+ dataBlocks[blockNum].free();
+
dataBlocks[blockNum].removeFrom(container);
+ dataBlocks[blockNum] = null;
+ } else {
+ Logger.error(this, "Data block
"+blockNum+" succeeded (sort of) on "+this+" but bucket is already nulled
out!");
+ }
+ }
+ }
+ if(persistent)
+ container.store(this);
+ completed = blocksCompleted;
+ succeeded = blocksSucceeded;
+ }
+ if(persistent) {
+ container.activate(putter, 1);
+ if(failedBlock)
+ putter.failedBlock(container, context);
+ else if(treatAsSuccess)
+ putter.completedBlock(false, container,
context);
+ container.deactivate(putter, 1);
+ }
+ if(succeeded == dataBlocks.length) {
+ container.activate(parent, 1);
+ parent.segmentFetchable(this, container);
+ container.deactivate(parent, 1);
+ } else if(completed == dataBlocks.length + checkBlocks.length) {
+ container.activate(parent, 1);
+ finish(container, context, parent);
+ container.deactivate(parent, 1);
+ }
+ }
+
+ @Override
+ public void onSuccess(Object keyNum, ObjectContainer container,
ClientContext context) {
+ BlockItem block = (BlockItem) keyNum;
+ int blockNum = block.blockNum;
+ int completed;
+ int succeeded;
+ synchronized(this) {
+ if(blockNum > dataBlocks.length) {
+ // Check block.
+ int checkNum = blockNum = dataBlocks.length;
+ if(!checkFinished[checkNum]) {
+ checkFinished[checkNum] = true;
+ checkFailed[checkNum] = false;
+ blocksCompleted++;
+ blocksSucceeded++;
+ } else {
+ if(checkFailed[checkNum])
+ Logger.error(this, "Got
onSuccess() but block has already failed! Check block "+checkNum+" on "+this);
+ else
+ Logger.error(this, "Got
onSuccess() but block has already succeeded: Check block "+checkNum+" on
"+this);
+ return;
+ }
+ if(checkBlocks[checkNum] != null) {
+ if(persistent)
container.activate(checkBlocks[checkNum], 1);
+ checkBlocks[checkNum].free();
+
checkBlocks[checkNum].removeFrom(container);
+ checkBlocks[checkNum] = null;
+ } else {
+ Logger.error(this, "Check block
"+checkNum+" succeeded on "+this+" but bucket is already nulled out!");
+ }
+ } else {
+ // Data block
+ if(!dataFinished[blockNum]) {
+ dataFinished[blockNum] = true;
+ dataFailed[blockNum] = false;
+ blocksCompleted++;
+ blocksSucceeded++;
+ } else {
+ if(dataFailed[blockNum])
+ Logger.error(this, "Got
onSuccess() but block has already failed! Data block "+blockNum+" on "+this);
+ else
+ Logger.error(this, "Got
onSuccess() but block has already succeeded: Data block "+blockNum+" on "+this);
+ return;
+ }
+ if(encoded && dataBlocks[blockNum] != null) {
+ if(persistent)
container.activate(dataBlocks[blockNum], 1);
+ dataBlocks[blockNum].free();
+
dataBlocks[blockNum].removeFrom(container);
+ dataBlocks[blockNum] = null;
+ } else if(dataBlocks[blockNum] == null) {
+ Logger.error(this, "Data block
"+blockNum+" succeeded on "+this+" but bucket is already nulled out!");
+ }
+ }
+ if(persistent)
+ container.store(this);
+ completed = blocksCompleted;
+ succeeded = blocksSucceeded;
+ }
+ if(persistent) {
+ container.activate(putter, 1);
+ putter.completedBlock(false, container, context);
+ container.deactivate(putter, 1);
+ }
+ if(succeeded == dataBlocks.length) {
+ container.activate(parent, 1);
+ parent.segmentFetchable(this, container);
+ container.deactivate(parent, 1);
+ } else if(completed == dataBlocks.length + checkBlocks.length) {
+ container.activate(parent, 1);
+ finish(container, context, parent);
+ container.deactivate(parent, 1);
+ }
+ }
+
+ @Override
+ public SendableRequestItem[] allKeys(ObjectContainer container,
ClientContext context) {
+ return sendableKeys(container, context);
+ }
+
+ @Override
+ public SendableRequestItem chooseKey(KeysFetchingLocally keys,
ObjectContainer container, ClientContext context) {
+ if(persistent) {
+ container.activate(this, 1);
+ container.activate(blocks, 1);
+ }
+ logMINOR = Logger.shouldLog(Logger.MINOR, this);
+ synchronized(this) {
+ if(finished) return null;
+ if(blocks.isEmpty()) {
+ if(logMINOR)
+ Logger.minor(this, "No blocks to
remove");
+ return null;
+ }
+ for(int i=0;i<10;i++) {
+ Integer ret;
+ int x;
+ if(blocks.size() == 0) return null;
+ x = context.random.nextInt(blocks.size());
+ ret = blocks.get(x);
+ int num = ret;
+
+ // Check whether it is already running
+ if(!persistent) {
+ if(keys.hasTransientInsert(this, new
FakeBlockItem(num)))
+ return null;
+ }
+
+ try {
+ return getBlockItem(container, context,
num);
+ } catch (IOException e) {
+ fail(new
InsertException(InsertException.BUCKET_ERROR, e, null), container, context);
+ return null;
+ }
+ }
+ return null;
+ }
+ }
+
+ @Override
+ public RequestClient getClient(ObjectContainer container) {
+ if(persistent) container.activate(putter, 1);
+ return putter.getClient();
+ }
+
+ @Override
+ public ClientRequester getClientRequest() {
+ return putter;
+ }
+
+ @Override
+ public short getPriorityClass(ObjectContainer container) {
+ container.activate(parent, 1);
+ return putter.getPriorityClass();
+ }
+
+ @Override
+ public int getRetryCount() {
+ // No point scheduling inserts by retry count.
+ // FIXME: Either implement sub-segments to schedule by retry
count,
+ // or (more likely imho) make the scheduler not care about
retry counts for inserts.
+ return 0;
+ }
+
+ @Override
+ public SendableRequestSender getSender(ObjectContainer container,
ClientContext context) {
+ return new SendableRequestSender() {
+
+ public boolean send(NodeClientCore core,
RequestScheduler sched, final ClientContext context, ChosenBlock req) {
+ // Ignore keyNum, key, since we're only sending
one block.
+ try {
+ BlockItem block = (BlockItem) req.token;
+ if(logMINOR) Logger.minor(this,
"Starting request: "+SplitFileInserterSegment.this+" block number
"+block.blockNum);
+ ClientCHKBlock b;
+ try {
+ b =
encodeBucket(block.copyBucket);
+ } catch (CHKEncodeException e) {
+ throw new
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR, e.toString() + ":" +
e.getMessage(), e);
+ } catch (MalformedURLException e) {
+ throw new
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR, e.toString() + ":" +
e.getMessage(), e);
+ } catch (IOException e) {
+ throw new
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR, e.toString() + ":" +
e.getMessage(), e);
+ } finally {
+ block.copyBucket.free();
+ }
+ final ClientCHK key = (ClientCHK)
b.getClientKey();
+ final int num = block.blockNum;
+ if(block.persistent) {
+ context.jobRunner.queue(new DBJob() {
+
+ public void run(ObjectContainer
container, ClientContext context) {
+
container.activate(SplitFileInserterSegment.this, 1);
+ onEncode(num, key,
container, context);
+
container.deactivate(SplitFileInserterSegment.this, 1);
+ }
+
+ }, NativeThread.NORM_PRIORITY+1, false);
+ } else {
+
context.mainExecutor.execute(new Runnable() {
+
+ public void run() {
+ onEncode(num,
key, null, context);
+ }
+
+ }, "Got URI");
+
+ }
+ if(b != null)
+ core.realPut(b,
req.cacheLocalRequests);
+ else {
+ Logger.error(this, "Asked to
send empty block on "+SplitFileInserterSegment.this, new Exception("error"));
+ return false;
+ }
+ } catch (LowLevelPutException e) {
+ req.onFailure(e, context);
+ if(logMINOR) Logger.minor(this,
"Request failed: "+SplitFileInserterSegment.this+" for "+e);
+ return true;
+ }
+ if(logMINOR) Logger.minor(this, "Request
succeeded: "+SplitFileInserterSegment.this);
+ req.onInsertSuccess(context);
+ return true;
+ }
+
+ };
+ }
+
+ protected ClientCHKBlock encodeBucket(Bucket copyBucket) throws
CHKEncodeException, IOException {
+ return ClientCHKBlock.encode(copyBucket, false, true,
(short)-1, CHKBlock.DATA_LENGTH);
+ }
+
+ @Override
+ public boolean isCancelled(ObjectContainer container) {
+ return finished;
+ }
+
+ @Override
+ public boolean isSSK() {
+ return false;
+ }
+
+ @Override
+ public List<PersistentChosenBlock> makeBlocks(PersistentChosenRequest
request, RequestScheduler sched, ObjectContainer container, ClientContext
context) {
+ if(persistent) {
+ container.activate(blocks, 1);
+ }
+ Integer[] blockNumbers;
+ synchronized(this) {
+ blockNumbers = blocks.toArray(new
Integer[blocks.size()]);
+ }
+ ArrayList<PersistentChosenBlock> blocks = new
ArrayList<PersistentChosenBlock>();
+ Arrays.sort(blockNumbers);
+ int prevBlockNumber = -1;
+ for(int i=0;i<blockNumbers.length;i++) {
+ int blockNumber = blockNumbers[i];
+ if(blockNumber == prevBlockNumber) {
+ Logger.error(this, "Duplicate block number in
makeBlocks() in "+this+": two copies of "+blockNumber);
+ continue;
+ }
+ prevBlockNumber = blockNumber;
+ SendableRequestItem item;
+ try {
+ item = getBlockItem(container, context,
blockNumber);
+ } catch (IOException e) {
+ fail(new
InsertException(InsertException.BUCKET_ERROR, e, null), container, context);
+ return null;
+ }
+ PersistentChosenBlock block = new
PersistentChosenBlock(false, request, item, null, null, sched);
+ if(logMINOR) Logger.minor(this, "Created block
"+block+" for block number "+blockNumber+" on "+this);
+ blocks.add(block);
+ }
+ blocks.trimToSize();
+ if(persistent) {
+ container.deactivate(blocks, 1);
+ }
+ return blocks;
+ }
+
+ @Override
+ public synchronized SendableRequestItem[] sendableKeys(ObjectContainer
container, ClientContext context) {
+ if(persistent) {
+ container.activate(blocks, 1);
+ }
+ SendableRequestItem[] items = new
SendableRequestItem[blocks.size()];
+ for(int i=0;i<blocks.size();i++)
+ try {
+ items[i] = getBlockItem(container, context,
blocks.get(i));
+ } catch (IOException e) {
+ fail(new
InsertException(InsertException.BUCKET_ERROR, e, null), container, context);
+ return null;
+ }
+ if(persistent) {
+ container.deactivate(blocks, 1);
+ }
+ return items;
+ }
+
+ public synchronized boolean isEmpty(ObjectContainer container) {
+ return (finished || blocks.isEmpty());
+ }
}
_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs