Author: toad
Date: 2006-07-14 19:52:30 +0000 (Fri, 14 Jul 2006)
New Revision: 9608
Added:
trunk/freenet/src/freenet/client/async/ResumeException.java
trunk/freenet/src/freenet/support/io/CannotCreateFromFieldSetException.java
trunk/freenet/src/freenet/support/io/NullPersistentFileTracker.java
trunk/freenet/src/freenet/support/io/PersistentFileTracker.java
trunk/freenet/src/freenet/support/io/SerializableToFieldSetBucketUtil.java
Modified:
trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
trunk/freenet/src/freenet/client/InserterContext.java
trunk/freenet/src/freenet/client/async/ClientPutter.java
trunk/freenet/src/freenet/client/async/SimpleManifestPutter.java
trunk/freenet/src/freenet/client/async/SingleFileInserter.java
trunk/freenet/src/freenet/client/async/SplitFileInserter.java
trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/Version.java
trunk/freenet/src/freenet/node/fcp/ClientPut.java
trunk/freenet/src/freenet/support/PaddedEphemerallyEncryptedBucket.java
trunk/freenet/src/freenet/support/RandomAccessFileBucket.java
trunk/freenet/src/freenet/support/io/FileBucket.java
trunk/freenet/src/freenet/support/io/PersistentTempBucketFactory.java
Log:
880: Persistent inserts automatically resume on startup from where they stopped.
Modified: trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
===================================================================
--- trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
2006-07-14 19:04:15 UTC (rev 9607)
+++ trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
2006-07-14 19:52:30 UTC (rev 9608)
@@ -17,6 +17,8 @@
import freenet.support.BucketFactory;
import freenet.support.BucketTools;
import freenet.support.Logger;
+import freenet.support.io.NullPersistentFileTracker;
+import freenet.support.io.PersistentFileTracker;
public class HighLevelSimpleClientImpl implements HighLevelSimpleClient {
@@ -24,6 +26,7 @@
private final short priorityClass;
private final BucketFactory bucketFactory;
private final BucketFactory persistentBucketFactory;
+ private final PersistentFileTracker persistentFileTracker;
private final Node node;
/** One CEP for all requests and inserts */
private final ClientEventProducer globalEventProducer;
@@ -71,6 +74,7 @@
archiveManager = mgr;
this.priorityClass = priorityClass;
bucketFactory = bf;
+ this.persistentFileTracker = node.persistentTempBucketFactory;
random = r;
this.globalEventProducer = new SimpleEventProducer();
globalEventProducer.addEventListener(new
EventLogger(Logger.MINOR));
@@ -118,7 +122,7 @@
InserterContext context = getInserterContext(true);
PutWaiter pw = new PutWaiter();
ClientPutter put = new ClientPutter(pw, insert.data,
insert.desiredURI, insert.clientMetadata,
- context, node.chkPutScheduler,
node.sskPutScheduler, priorityClass, getCHKOnly, isMetadata, this);
+ context, node.chkPutScheduler,
node.sskPutScheduler, priorityClass, getCHKOnly, isMetadata, this, null);
put.start();
return pw.waitForCompletion();
}
@@ -174,7 +178,9 @@
}
public InserterContext getInserterContext(boolean forceNonPersistent) {
- return new InserterContext(bucketFactory, forceNonPersistent ?
bucketFactory : persistentBucketFactory, random, INSERT_RETRIES,
CONSECUTIVE_RNFS_ASSUME_SUCCESS,
+ return new InserterContext(bucketFactory, forceNonPersistent ?
bucketFactory : persistentBucketFactory,
+ forceNonPersistent ? new
NullPersistentFileTracker() : persistentFileTracker,
+ random, INSERT_RETRIES,
CONSECUTIVE_RNFS_ASSUME_SUCCESS,
SPLITFILE_INSERT_THREADS,
SPLITFILE_BLOCKS_PER_SEGMENT, SPLITFILE_CHECK_BLOCKS_PER_SEGMENT,
globalEventProducer, cacheLocalRequests,
node.uskManager);
}
Modified: trunk/freenet/src/freenet/client/InserterContext.java
===================================================================
--- trunk/freenet/src/freenet/client/InserterContext.java 2006-07-14
19:04:15 UTC (rev 9607)
+++ trunk/freenet/src/freenet/client/InserterContext.java 2006-07-14
19:52:30 UTC (rev 9608)
@@ -5,12 +5,15 @@
import freenet.client.events.SimpleEventProducer;
import freenet.crypt.RandomSource;
import freenet.support.BucketFactory;
+import freenet.support.io.NullPersistentFileTracker;
+import freenet.support.io.PersistentFileTracker;
/** Context object for an insert operation, including both simple and
multi-file inserts */
public class InserterContext {
public final BucketFactory bf;
public final BucketFactory persistentBucketFactory;
+ public final PersistentFileTracker persistentFileTracker;
/** If true, don't try to compress the data */
public boolean dontCompress;
public final RandomSource random;
@@ -25,10 +28,11 @@
public final boolean cacheLocalRequests;
public final USKManager uskManager;
- public InserterContext(BucketFactory bf, BucketFactory persistentBF,
RandomSource random,
+ public InserterContext(BucketFactory bf, BucketFactory persistentBF,
PersistentFileTracker tracker, RandomSource random,
int maxRetries, int rnfsToSuccess, int maxThreads, int
splitfileSegmentDataBlocks, int splitfileSegmentCheckBlocks,
ClientEventProducer eventProducer, boolean
cacheLocalRequests, USKManager uskManager) {
this.bf = bf;
+ this.persistentFileTracker = tracker;
this.persistentBucketFactory = persistentBF;
this.uskManager = uskManager;
this.random = random;
@@ -44,6 +48,7 @@
}
public InserterContext(InserterContext ctx, SimpleEventProducer
producer, boolean forceNonPersistent) {
+ this.persistentFileTracker = forceNonPersistent ? new
NullPersistentFileTracker() : ctx.persistentFileTracker;
this.uskManager = ctx.uskManager;
this.bf = ctx.bf;
this.persistentBucketFactory = forceNonPersistent ? ctx.bf :
ctx.persistentBucketFactory;
@@ -60,6 +65,7 @@
}
public InserterContext(InserterContext ctx, SimpleEventProducer
producer) {
+ this.persistentFileTracker = ctx.persistentFileTracker;
this.uskManager = ctx.uskManager;
this.bf = ctx.bf;
this.persistentBucketFactory = ctx.persistentBucketFactory;
Modified: trunk/freenet/src/freenet/client/async/ClientPutter.java
===================================================================
--- trunk/freenet/src/freenet/client/async/ClientPutter.java 2006-07-14
19:04:15 UTC (rev 9607)
+++ trunk/freenet/src/freenet/client/async/ClientPutter.java 2006-07-14
19:52:30 UTC (rev 9608)
@@ -24,6 +24,9 @@
private final boolean getCHKOnly;
private final boolean isMetadata;
private FreenetURI uri;
+ /** SimpleFieldSet containing progress information from last startup.
+ * Will be progressively cleared during startup. */
+ private final SimpleFieldSet oldProgress;
/**
* @param client The object to call back when we complete, or don't.
@@ -36,10 +39,12 @@
* @param getCHKOnly
* @param isMetadata
* @param clientContext The client object for purposs of round-robin
client balancing.
+ * @param stored The progress so far, stored as a SimpleFieldSet.
Advisory; if there
+ * is an error reading this in, we will restart from scratch.
*/
public ClientPutter(ClientCallback client, Bucket data, FreenetURI
targetURI, ClientMetadata cm, InserterContext ctx,
ClientRequestScheduler chkScheduler,
ClientRequestScheduler sskScheduler, short priorityClass, boolean getCHKOnly,
- boolean isMetadata, Object clientContext) {
+ boolean isMetadata, Object clientContext,
SimpleFieldSet stored) {
super(priorityClass, chkScheduler, sskScheduler, clientContext);
this.cm = cm;
this.isMetadata = isMetadata;
@@ -50,6 +55,7 @@
this.ctx = ctx;
this.finished = false;
this.cancelled = false;
+ this.oldProgress = stored;
}
public void start() throws InserterException {
@@ -58,7 +64,7 @@
currentState =
new SingleFileInserter(this, this, new
InsertBlock(data, cm, targetURI), isMetadata, ctx, false, getCHKOnly, false,
null, false);
}
- ((SingleFileInserter)currentState).start();
+ ((SingleFileInserter)currentState).start(oldProgress);
} catch (InserterException e) {
synchronized(this) {
finished = true;
Added: trunk/freenet/src/freenet/client/async/ResumeException.java
===================================================================
--- trunk/freenet/src/freenet/client/async/ResumeException.java 2006-07-14
19:04:15 UTC (rev 9607)
+++ trunk/freenet/src/freenet/client/async/ResumeException.java 2006-07-14
19:52:30 UTC (rev 9608)
@@ -0,0 +1,19 @@
+package freenet.client.async;
+
+import freenet.support.io.CannotCreateFromFieldSetException;
+
+/**
+ * Thrown when the resuming of a request from a SimpleFieldSet fails. If this
happens
+ * then we simply restart the request from the beginning.
+ */
+public class ResumeException extends Exception {
+
+ public ResumeException(String msg) {
+ super(msg);
+ }
+
+ public ResumeException(String msg, CannotCreateFromFieldSetException e)
{
+ super(msg+" : "+e, e);
+ }
+
+}
Modified: trunk/freenet/src/freenet/client/async/SimpleManifestPutter.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SimpleManifestPutter.java
2006-07-14 19:04:15 UTC (rev 9607)
+++ trunk/freenet/src/freenet/client/async/SimpleManifestPutter.java
2006-07-14 19:52:30 UTC (rev 9608)
@@ -69,7 +69,7 @@
public void start() throws InserterException {
if((origSFI == null) && (metadata != null)) return;
- origSFI.start();
+ origSFI.start(null);
origSFI = null;
}
@@ -415,7 +415,7 @@
new SingleFileInserter(this, this, block,
isMetadata, ctx, false, getCHKOnly, false, baseMetadata,
insertAsArchiveManifest);
Logger.minor(this, "Inserting main metadata:
"+metadataInserter);
this.metadataPuttersByMetadata.put(baseMetadata,
metadataInserter);
- metadataInserter.start();
+ metadataInserter.start(null);
} catch (InserterException e) {
fail(e);
}
@@ -441,7 +441,7 @@
synchronized(this) {
this.metadataPuttersByMetadata.put(m,
metadataInserter);
}
- metadataInserter.start();
+ metadataInserter.start(null);
} catch (MetadataUnresolvedException e1) {
resolve(e1);
}
Modified: trunk/freenet/src/freenet/client/async/SingleFileInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SingleFileInserter.java
2006-07-14 19:04:15 UTC (rev 9607)
+++ trunk/freenet/src/freenet/client/async/SingleFileInserter.java
2006-07-14 19:52:30 UTC (rev 9608)
@@ -43,7 +43,7 @@
final boolean insertAsArchiveManifest;
/** If true, we are not the top level request, and should not
* update our parent to point to us as current put-stage. */
- private boolean reportMetadataOnly;
+ private final boolean reportMetadataOnly;
public final Object token;
/**
@@ -72,7 +72,22 @@
this.insertAsArchiveManifest = insertAsArchiveManifest;
}
- public void start() throws InserterException {
+ public void start(SimpleFieldSet fs) throws InserterException {
+ if(fs != null) {
+ String type = fs.get("Type");
+ if(type.equals("SplitHandler")) {
+ // Try to reconstruct SplitHandler.
+ // If we succeed, we bypass both compression
and FEC encoding!
+ try {
+ SplitHandler sh = new SplitHandler();
+ sh.start(fs);
+ cb.onTransition(this, sh);
+ return;
+ } catch (ResumeException e) {
+ Logger.error(this, "Failed to restore:
"+e, e);
+ }
+ }
+ }
if(block.getData().size() > COMPRESS_OFF_THREAD_LIMIT) {
// Run off thread
OffThreadCompressor otc = new OffThreadCompressor();
@@ -263,6 +278,42 @@
boolean splitInsertSetBlocks;
boolean metaInsertSetBlocks;
+ /**
+ * Create a SplitHandler from a stored progress SimpleFieldSet.
+ * @throws ResumeException Thrown if the resume fails.
+ * @throws InserterException Thrown if some other error
prevents the insert
+ * from starting.
+ */
+ void start(SimpleFieldSet fs) throws ResumeException,
InserterException {
+
+ // FIXME: Include the booleans?
+
+ SimpleFieldSet sfiFS = fs.subset("SplitFileInserter");
+ if(sfiFS == null)
+ throw new ResumeException("No
SplitFileInserter");
+ sfi = new SplitFileInserter(parent, this,
block.clientMetadata, ctx, getCHKOnly, metadata, token,
insertAsArchiveManifest, sfiFS);
+ SimpleFieldSet metaFS = fs.subset("MetadataPutter");
+ if(metaFS != null) {
+ String type = metaFS.get("Type");
+ if(type.equals("SplitFileInserter")) {
+ metadataPutter =
+ new SplitFileInserter(parent,
this, block.clientMetadata, ctx, getCHKOnly, metadata, token,
insertAsArchiveManifest, metaFS);
+ } else if(type.equals("SplitHandler")) {
+ metadataPutter = new SplitHandler();
+
((SplitHandler)metadataPutter).start(metaFS);
+ }
+ }
+
+ sfi.schedule();
+ if(metadataPutter != null) {
+ metadataPutter.schedule();
+ }
+ }
+
+ public SplitHandler() {
+ // Default constructor
+ }
+
public synchronized void onTransition(ClientPutState oldState,
ClientPutState newState) {
if(oldState == sfi)
sfi = newState;
@@ -310,6 +361,8 @@
cb.onMetadata(meta, this);
metaInsertSuccess = true;
} else {
+ if(metadataPutter != null)
+ return;
Bucket metadataBucket;
try {
metadataBucket =
BucketTools.makeImmutableBucket(ctx.bf, meta.writeToByteArray());
@@ -334,7 +387,7 @@
}
try {
-
((SingleFileInserter)metadataPutter).start();
+
((SingleFileInserter)metadataPutter).start(null);
} catch (InserterException e) {
fail(e);
return;
@@ -409,7 +462,7 @@
}
public void schedule() throws InserterException {
- start();
+ start(null);
}
public Object getToken() {
Modified: trunk/freenet/src/freenet/client/async/SplitFileInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SplitFileInserter.java
2006-07-14 19:04:15 UTC (rev 9607)
+++ trunk/freenet/src/freenet/client/async/SplitFileInserter.java
2006-07-14 19:52:30 UTC (rev 9608)
@@ -13,6 +13,7 @@
import freenet.keys.FreenetURI;
import freenet.support.Bucket;
import freenet.support.BucketTools;
+import freenet.support.Fields;
import freenet.support.Logger;
import freenet.support.SimpleFieldSet;
import freenet.support.compress.Compressor;
@@ -45,7 +46,10 @@
fs.put("Type", "SplitFileInserter");
fs.put("DataLength", Long.toString(dataLength));
fs.put("CompressionCodec", Short.toString(compressionCodec));
+ fs.put("SplitfileCodec", Short.toString(splitfileAlgorithm));
fs.put("Finished", Boolean.toString(finished));
+ fs.put("SegmentSize", Integer.toString(segmentSize));
+ fs.put("CheckSegmentSize", Integer.toString(checkSegmentSize));
SimpleFieldSet segs = new SimpleFieldSet(true);
for(int i=0;i<segments.length;i++) {
segs.put(Integer.toString(i),
segments[i].getProgressFieldset());
@@ -90,6 +94,79 @@
countCheckBlocks = count;
}
+ public SplitFileInserter(BaseClientPutter parent, PutCompletionCallback
cb, ClientMetadata clientMetadata, InserterContext ctx, boolean getCHKOnly,
boolean metadata, Object token, boolean insertAsArchiveManifest, SimpleFieldSet
fs) throws ResumeException {
+ this.parent = parent;
+ this.insertAsArchiveManifest = insertAsArchiveManifest;
+ this.token = token;
+ this.finished = false;
+ this.isMetadata = metadata;
+ this.cm = clientMetadata;
+ this.getCHKOnly = getCHKOnly;
+ this.cb = cb;
+ this.ctx = ctx;
+ finished = Fields.stringToBool(fs.get("Finished"), false);
+ String length = fs.get("DataLength");
+ if(length == null) throw new ResumeException("No DataLength");
+ try {
+ dataLength = Long.parseLong(length);
+ } catch (NumberFormatException e) {
+ throw new ResumeException("Corrupt DataLength: "+e+" :
"+length);
+ }
+ String tmp = fs.get("SegmentSize");
+ if(length == null) throw new ResumeException("No SegmentSize");
+ try {
+ segmentSize = Integer.parseInt(tmp);
+ } catch (NumberFormatException e) {
+ throw new ResumeException("Corrupt SegmentSize: "+e+" :
"+length);
+ }
+ tmp = fs.get("CheckSegmentSize");
+ if(length == null) throw new ResumeException("No
CheckSegmentSize");
+ try {
+ checkSegmentSize = Integer.parseInt(tmp);
+ } catch (NumberFormatException e) {
+ throw new ResumeException("Corrupt CheckSegmentSize:
"+e+" : "+length);
+ }
+ String ccodec = fs.get("CompressionCodec");
+ if(ccodec == null) throw new ResumeException("No compression
codec");
+ try {
+ compressionCodec = Short.parseShort(ccodec);
+ } catch (NumberFormatException e) {
+ throw new ResumeException("Corrupt CompressionCodec:
"+e+" : "+ccodec);
+ }
+ String scodec = fs.get("CompressionCodec");
+ if(scodec == null) throw new ResumeException("No compression
codec");
+ try {
+ splitfileAlgorithm = Short.parseShort(scodec);
+ } catch (NumberFormatException e) {
+ throw new ResumeException("Corrupt SplitfileCodec:
"+e+" : "+scodec);
+ }
+ SimpleFieldSet segFS = fs.subset("Segments");
+ if(segFS == null) throw new ResumeException("No segments");
+ String segc = segFS.get("Count");
+ if(segc == null) throw new ResumeException("No segment count");
+ int segmentCount;
+ try {
+ segmentCount = Integer.parseInt(segc);
+ } catch (NumberFormatException e) {
+ throw new ResumeException("Corrupt segment count: "+e+"
: "+segc);
+ }
+ segments = new SplitFileInserterSegment[segmentCount];
+
+ int dataBlocks = 0;
+ int checkBlocks = 0;
+
+ for(int i=0;i<segments.length;i++) {
+ SimpleFieldSet segment =
segFS.subset(Integer.toString(i));
+ if(segment == null) throw new ResumeException("No
segment "+i);
+ segments[i] = new SplitFileInserterSegment(this,
segment, splitfileAlgorithm, ctx, getCHKOnly, i);
+ dataBlocks += segments[i].countDataBlocks();
+ checkBlocks += segments[i].countCheckBlocks();
+ }
+
+ this.countDataBlocks = dataBlocks;
+ this.countCheckBlocks = checkBlocks;
+ }
+
/**
* Group the blocks into segments.
*/
@@ -129,8 +206,14 @@
public void start() throws InserterException {
for(int i=0;i<segments.length;i++)
segments[i].start();
+
+ if(finished) {
+ // FIXME call callback with metadata etc
+ }
+
if(countDataBlocks > 32)
parent.onMajorProgress();
+
}
public void encodedSegment(SplitFileInserterSegment segment) {
Modified: trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java
2006-07-14 19:04:15 UTC (rev 9607)
+++ trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java
2006-07-14 19:52:30 UTC (rev 9608)
@@ -1,6 +1,7 @@
package freenet.client.async;
import java.io.IOException;
+import java.net.MalformedURLException;
import freenet.client.FECCodec;
import freenet.client.FailureCodeTracker;
@@ -11,12 +12,13 @@
import freenet.keys.CHKBlock;
import freenet.keys.FreenetURI;
import freenet.support.Bucket;
-import freenet.support.HexUtil;
+import freenet.support.Fields;
import freenet.support.Logger;
-import freenet.support.PaddedEphemerallyEncryptedBucket;
import freenet.support.SimpleFieldSet;
-import freenet.support.io.FileBucket;
+import freenet.support.io.CannotCreateFromFieldSetException;
+import freenet.support.io.PersistentTempBucketFactory;
import freenet.support.io.SerializableToFieldSetBucket;
+import freenet.support.io.SerializableToFieldSetBucketUtil;
public class SplitFileInserterSegment implements PutCompletionCallback {
@@ -58,6 +60,138 @@
this.segNo = segNo;
}
+ /** Resume an insert segment
+ * @throws ResumeException */
+ public SplitFileInserterSegment(SplitFileInserter parent,
SimpleFieldSet fs, short splitfileAlgorithm, InserterContext ctx, boolean
getCHKOnly, int segNo) throws ResumeException {
+ this.parent = parent;
+ this.getCHKOnly = getCHKOnly;
+ this.blockInsertContext = ctx;
+ this.segNo = segNo;
+ if(!"SplitFileInserterSegment".equals(fs.get("Type")))
+ throw new ResumeException("Wrong Type:
"+fs.get("Type"));
+ finished = Fields.stringToBool(fs.get("Finished"), false);
+ encoded = Fields.stringToBool(fs.get("Encoded"), false);
+ started = Fields.stringToBool(fs.get("Started"), false);
+ SimpleFieldSet errorsFS = fs.subset("Errors");
+ if(errorsFS != null)
+ this.errors = new FailureCodeTracker(true, errorsFS);
+ else
+ this.errors = new FailureCodeTracker(true);
+ if(finished && !errors.isEmpty())
+ toThrow = InserterException.construct(errors);
+ blocksGotURI = 0;
+ blocksCompleted = 0;
+ SimpleFieldSet dataFS = fs.subset("DataBlocks");
+ if(dataFS == null)
+ throw new ResumeException("No data blocks");
+ String tmp = dataFS.get("Count");
+ if(tmp == null) throw new ResumeException("No data block
count");
+ int dataBlockCount;
+ try {
+ dataBlockCount = Integer.parseInt(tmp);
+ } catch (NumberFormatException e) {
+ throw new ResumeException("Corrupt data blocks count:
"+e+" : "+tmp);
+ }
+
+ hasURIs = true;
+
+ dataBlocks = new Bucket[dataBlockCount];
+ dataURIs = new FreenetURI[dataBlockCount];
+ dataBlockInserters = new SingleBlockInserter[dataBlockCount];
+ for(int i=0;i<dataBlockCount;i++) {
+ SimpleFieldSet blockFS =
dataFS.subset(Integer.toString(i));
+ if(blockFS == null) throw new ResumeException("No data
block "+i+" on segment "+segNo);
+ tmp = blockFS.get("URI");
+ if(tmp != null) {
+ try {
+ dataURIs[i] = new FreenetURI(tmp);
+ blocksGotURI++;
+ } catch (MalformedURLException e) {
+ throw new ResumeException("Corrupt URI:
"+e+" : "+tmp);
+ }
+ } else hasURIs = false;
+ boolean blockFinished =
Fields.stringToBool(blockFS.get("Finished"), false);
+ if(blockFinished && dataURIs[i] == null)
+ throw new ResumeException("Block "+i+" of
"+segNo+" finished but no URI");
+ if(blockFinished && !encoded)
+ throw new ResumeException("Block "+i+" of
"+segNo+" finished but not encoded");
+ if(!blockFinished) {
+ // Read data
+ SimpleFieldSet bucketFS =
blockFS.subset("Data");
+ if(bucketFS == null)
+ throw new ResumeException("Block "+i+"
of "+segNo+" not finished but no data");
+ try {
+ dataBlocks[i] =
SerializableToFieldSetBucketUtil.create(bucketFS, ctx.random,
ctx.persistentFileTracker);
+ } catch (CannotCreateFromFieldSetException e) {
+ throw new ResumeException("Failed to
deserialize block "+i+" of "+segNo+" : "+e, e);
+ }
+ if(dataBlocks[i] == null)
+ throw new ResumeException("Block "+i+"
of "+segNo+" not finished but no data (create returned null)");
+ // Don't create fetcher yet; that happens in
start()
+ } else blocksCompleted++;
+ }
+
+ SimpleFieldSet checkFS = fs.subset("CheckBlocks");
+ if(checkFS != null) {
+ tmp = checkFS.get("Count");
+ if(tmp == null) throw new ResumeException("Check blocks
but no check block count");
+ int checkBlockCount;
+ try {
+ checkBlockCount = Integer.parseInt(tmp);
+ } catch (NumberFormatException e) {
+ throw new ResumeException("Corrupt check blocks
count: "+e+" : "+tmp);
+ }
+ checkBlocks = new Bucket[checkBlockCount];
+ checkURIs = new FreenetURI[checkBlockCount];
+ checkBlockInserters = new
SingleBlockInserter[checkBlockCount];
+ for(int i=0;i<checkBlockCount;i++) {
+ SimpleFieldSet blockFS =
checkFS.subset(Integer.toString(i));
+ if(blockFS == null) {
+ if(encoded) throw new
ResumeException("No check block "+i+" of "+segNo);
+ else continue;
+ }
+ tmp = blockFS.get("URI");
+ if(tmp != null) {
+ try {
+ checkURIs[i] = new
FreenetURI(tmp);
+ blocksGotURI++;
+ } catch (MalformedURLException e) {
+ throw new
ResumeException("Corrupt URI: "+e+" : "+tmp);
+ }
+ } else hasURIs = false;
+ boolean blockFinished =
Fields.stringToBool(blockFS.get("Finished"), false);
+ if(blockFinished && checkURIs[i] == null)
+ throw new ResumeException("Check block
"+i+" of "+segNo+" finished but no URI");
+ if(blockFinished && !encoded)
+ throw new ResumeException("Check block
"+i+" of "+segNo+" finished but not encoded");
+ if(!blockFinished) {
+ // Read data
+ SimpleFieldSet bucketFS =
blockFS.subset("Data");
+ if(bucketFS == null)
+ throw new
ResumeException("Check block "+i+" of "+segNo+" not finished but no data");
+ try {
+ checkBlocks[i] =
SerializableToFieldSetBucketUtil.create(bucketFS, ctx.random,
ctx.persistentFileTracker);
+ } catch
(CannotCreateFromFieldSetException e) {
+ throw new
ResumeException("Failed to deserialize check block "+i+" of "+segNo+" : "+e, e);
+ }
+ if(checkBlocks[i] == null)
+ throw new
ResumeException("Check block "+i+" of "+segNo+" not finished but no data
(create returned null)");
+ // Don't create fetcher yet; that happens in
start()
+ } else blocksCompleted++;
+ }
+ splitfileAlgo = FECCodec.getCodec(splitfileAlgorithm,
dataBlockCount, checkBlocks.length);
+ } else {
+ splitfileAlgo = FECCodec.getCodec(splitfileAlgorithm,
dataBlockCount);
+ int checkBlocksCount = splitfileAlgo.countCheckBlocks();
+ this.checkURIs = new FreenetURI[checkBlocksCount];
+ this.checkBlocks = new Bucket[checkBlocksCount];
+ this.checkBlockInserters = new
SingleBlockInserter[checkBlocksCount];
+ hasURIs = false;
+ }
+ parent.parent.addBlocks(dataURIs.length+checkURIs.length);
+
parent.parent.addMustSucceedBlocks(dataURIs.length+checkURIs.length);
+ }
+
public synchronized SimpleFieldSet getProgressFieldset() {
SimpleFieldSet fs = new SimpleFieldSet(true);
fs.put("Type", "SplitFileInserterSegment");
@@ -80,7 +214,6 @@
if(started) {
block.put("Finished", finished);
}
- if(finished) continue;
if(!finished) {
Bucket data = dataBlocks[i];
if(data instanceof
SerializableToFieldSetBucket) {
@@ -100,7 +233,7 @@
}
fs.put("DataBlocks", dataFS);
SimpleFieldSet checkFS = new SimpleFieldSet(true);
- checkFS.put("Count", Integer.toString(dataBlocks.length));
+ checkFS.put("Count", Integer.toString(checkBlocks.length));
for(int i=0;i<checkBlocks.length;i++) {
SimpleFieldSet block = new SimpleFieldSet(true);
if(checkURIs[i] != null)
@@ -131,17 +264,31 @@
public void start() throws InserterException {
for(int i=0;i<dataBlockInserters.length;i++) {
- dataBlockInserters[i] =
- new SingleBlockInserter(parent.parent,
dataBlocks[i], (short)-1, FreenetURI.EMPTY_CHK_URI, blockInsertContext, this,
false, CHKBlock.DATA_LENGTH, i, getCHKOnly, false, false, parent.token);
- dataBlockInserters[i].schedule();
+ if(dataBlocks[i] != null) { // else already finished on
creation
+ dataBlockInserters[i] =
+ new SingleBlockInserter(parent.parent,
dataBlocks[i], (short)-1, FreenetURI.EMPTY_CHK_URI, blockInsertContext, this,
false, CHKBlock.DATA_LENGTH, i, getCHKOnly, false, false, parent.token);
+ dataBlockInserters[i].schedule();
+ } else {
+ parent.parent.completedBlock(true);
+ }
}
+ parent.parent.notifyClients();
started = true;
- if(splitfileAlgo != null) {
+ if(splitfileAlgo != null && !encoded) {
// Encode blocks
Thread t = new Thread(new EncodeBlocksRunnable(),
"Blocks encoder");
t.setDaemon(true);
t.start();
}
+ if(encoded) {
+ parent.encodedSegment(this);
+ }
+ if(hasURIs) {
+ parent.segmentHasURIs(this);
+ }
+ if(finished) {
+ parent.segmentFinished(this);
+ }
}
private class EncodeBlocksRunnable implements Runnable {
@@ -156,9 +303,13 @@
splitfileAlgo.encode(dataBlocks, checkBlocks,
CHKBlock.DATA_LENGTH, blockInsertContext.persistentBucketFactory);
// Start the inserts
for(int i=0;i<checkBlockInserters.length;i++) {
- checkBlockInserters[i] =
- new SingleBlockInserter(parent.parent,
checkBlocks[i], (short)-1, FreenetURI.EMPTY_CHK_URI, blockInsertContext, this,
false, CHKBlock.DATA_LENGTH, i + dataBlocks.length, getCHKOnly, false, false,
parent.token);
- checkBlockInserters[i].schedule();
+ if(checkBlocks[i] != null) { // else already
finished on creation
+ checkBlockInserters[i] =
+ new
SingleBlockInserter(parent.parent, checkBlocks[i], (short)-1,
FreenetURI.EMPTY_CHK_URI, blockInsertContext, this, false,
CHKBlock.DATA_LENGTH, i + dataBlocks.length, getCHKOnly, false, false,
parent.token);
+ checkBlockInserters[i].schedule();
+ } else {
+ parent.parent.completedBlock(true);
+ }
}
// Tell parent only after have started the inserts.
// Because of the counting.
@@ -280,7 +431,12 @@
public int countCheckBlocks() {
return checkBlocks.length;
}
+
+ public int countDataBlocks() {
+ return dataBlocks.length;
+ }
+
public FreenetURI[] getCheckURIs() {
return checkURIs;
}
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2006-07-14 19:04:15 UTC (rev
9607)
+++ trunk/freenet/src/freenet/node/Node.java 2006-07-14 19:52:30 UTC (rev
9608)
@@ -239,7 +239,7 @@
inserter = new ClientPutter(this, b, uri,
new
ClientMetadata("text/plain") /* it won't quite fit in an SSK anyway */,
Node.this.makeClient((short)0).getInserterContext(true),
- chkPutScheduler,
sskPutScheduler, RequestStarter.INTERACTIVE_PRIORITY_CLASS, false, false, this);
+ chkPutScheduler,
sskPutScheduler, RequestStarter.INTERACTIVE_PRIORITY_CLASS, false, false, this,
null);
try {
Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-07-14 19:04:15 UTC (rev
9607)
+++ trunk/freenet/src/freenet/node/Version.java 2006-07-14 19:52:30 UTC (rev
9608)
@@ -18,7 +18,7 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- private static final int buildNumber = 879;
+ private static final int buildNumber = 880;
/** Oldest build of Fred we will talk to */
private static final int oldLastGoodBuild = 870;
Modified: trunk/freenet/src/freenet/node/fcp/ClientPut.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientPut.java 2006-07-14 19:04:15 UTC
(rev 9607)
+++ trunk/freenet/src/freenet/node/fcp/ClientPut.java 2006-07-14 19:52:30 UTC
(rev 9608)
@@ -70,7 +70,7 @@
Logger.minor(this, "data = "+data+", uploadFrom =
"+ClientPutMessage.uploadFromString(uploadFrom));
inserter = new ClientPutter(this, data, uri, cm,
ctx, client.node.chkPutScheduler,
client.node.sskPutScheduler, priorityClass,
- getCHKOnly, isMetadata, client);
+ getCHKOnly, isMetadata, client, null);
if((persistenceType != PERSIST_CONNECTION) && (handler != null))
sendPendingMessages(handler.outputHandler, true, false,
false);
}
@@ -150,7 +150,7 @@
}
this.clientMetadata = cm;
inserter = new ClientPutter(this, data, uri, cm, ctx,
client.node.chkPutScheduler,
- client.node.sskPutScheduler, priorityClass,
getCHKOnly, isMetadata, client);
+ client.node.sskPutScheduler, priorityClass,
getCHKOnly, isMetadata, client, fs.subset("progress"));
if(!finished)
start();
}
Modified:
trunk/freenet/src/freenet/support/PaddedEphemerallyEncryptedBucket.java
===================================================================
--- trunk/freenet/src/freenet/support/PaddedEphemerallyEncryptedBucket.java
2006-07-14 19:04:15 UTC (rev 9607)
+++ trunk/freenet/src/freenet/support/PaddedEphemerallyEncryptedBucket.java
2006-07-14 19:52:30 UTC (rev 9608)
@@ -10,7 +10,11 @@
import freenet.crypt.RandomSource;
import freenet.crypt.UnsupportedCipherException;
import freenet.crypt.ciphers.Rijndael;
+import freenet.support.io.CannotCreateFromFieldSetException;
+import freenet.support.io.PersistentFileTracker;
+import freenet.support.io.PersistentTempBucketFactory;
import freenet.support.io.SerializableToFieldSetBucket;
+import freenet.support.io.SerializableToFieldSetBucketUtil;
/**
* A proxy Bucket which adds:
@@ -90,6 +94,42 @@
lastOutputStream = 0;
}
+ public PaddedEphemerallyEncryptedBucket(SimpleFieldSet fs, RandomSource
origRandom, PersistentFileTracker f) throws CannotCreateFromFieldSetException {
+ this.origRandom = origRandom;
+ String tmp = fs.get("DataLength");
+ if(tmp == null)
+ throw new CannotCreateFromFieldSetException("No
DataLength");
+ try {
+ dataLength = Long.parseLong(tmp);
+ } catch (NumberFormatException e) {
+ throw new CannotCreateFromFieldSetException("Corrupt
dataLength: "+tmp, e);
+ }
+ SimpleFieldSet underlying = fs.subset("Underlying");
+ if(underlying == null)
+ throw new CannotCreateFromFieldSetException("No
underlying bucket");
+ bucket = SerializableToFieldSetBucketUtil.create(underlying,
origRandom, f);
+ tmp = fs.get("DecryptKey");
+ if(tmp == null)
+ throw new CannotCreateFromFieldSetException("No key");
+ key = HexUtil.hexToBytes(tmp);
+ try {
+ aes = new Rijndael(256, 256);
+ } catch (UnsupportedCipherException e) {
+ throw new Error(e);
+ }
+ aes.initialize(key);
+ tmp = fs.get("MinPaddedSize");
+ if(tmp == null)
+ minPaddedSize = 1024; // FIXME throw! back
compatibility hack
+ else {
+ try {
+ minPaddedSize = Integer.parseInt(tmp);
+ } catch (NumberFormatException e) {
+ throw new
CannotCreateFromFieldSetException("Corrupt dataLength: "+tmp, e);
+ }
+ }
+ }
+
public OutputStream getOutputStream() throws IOException {
if(readOnly) throw new IOException("Read only");
OutputStream os = bucket.getOutputStream();
@@ -312,6 +352,7 @@
Logger.error(this, "Cannot serialize underlying bucket:
"+bucket);
return null;
}
+ fs.put("MinPaddedSize", minPaddedSize);
return fs;
}
Modified: trunk/freenet/src/freenet/support/RandomAccessFileBucket.java
===================================================================
--- trunk/freenet/src/freenet/support/RandomAccessFileBucket.java
2006-07-14 19:04:15 UTC (rev 9607)
+++ trunk/freenet/src/freenet/support/RandomAccessFileBucket.java
2006-07-14 19:52:30 UTC (rev 9608)
@@ -8,6 +8,9 @@
import java.io.RandomAccessFile;
import java.util.Vector;
+import freenet.support.io.CannotCreateFromFieldSetException;
+import freenet.support.io.PersistentFileTracker;
+import freenet.support.io.PersistentTempBucketFactory;
import freenet.support.io.SerializableToFieldSetBucket;
/**
@@ -17,6 +20,14 @@
**/
public class RandomAccessFileBucket implements Bucket,
SerializableToFieldSetBucket {
+ private final File file;
+ private long offset = -1;
+ private long localOffset = 0;
+ private long len = -1;
+ private boolean readOnly = false;
+ private boolean released = false;
+ private Vector streams = new Vector();
+
public RandomAccessFileBucket(File file, long offset, long len, boolean
readOnly)
throws IOException {
if (!(file.exists() && file.canRead())) {
@@ -32,7 +43,27 @@
setRange(offset, len);
}
- public synchronized void setRange(long offset, long len) throws
IOException {
+ public RandomAccessFileBucket(SimpleFieldSet fs, PersistentFileTracker f)
throws CannotCreateFromFieldSetException {
+ String tmp = fs.get("Filename");
+ if(tmp == null) throw new CannotCreateFromFieldSetException("No
filename");
+ this.file = new File(tmp);
+ tmp = fs.get("Length");
+ if(tmp == null) throw new CannotCreateFromFieldSetException("No
length");
+ try {
+ len = Long.parseLong(tmp);
+ } catch (NumberFormatException e) {
+ throw new CannotCreateFromFieldSetException("Corrupt
length "+tmp, e);
+ }
+ tmp = fs.get("Offset");
+ if(tmp == null) throw new CannotCreateFromFieldSetException("No
offset");
+ try {
+ offset = Long.parseLong(tmp);
+ } catch (NumberFormatException e) {
+ throw new CannotCreateFromFieldSetException("Corrupt
offset "+tmp, e);
+ }
+ }
+
+ public synchronized void setRange(long offset, long len) throws
IOException {
if (isReleased()) {
throw new IOException("Attempt to use a released
RandomAccessFileBucket: " + getName() );
}
@@ -424,14 +455,6 @@
}
////////////////////////////////////////////////////////////
- private final File file;
- private long offset = -1;
- private long localOffset = 0;
- private long len = -1;
- private boolean readOnly = false;
- private boolean released = false;
- private Vector streams = new Vector();
-
public boolean isReadOnly() {
return readOnly;
}
Added:
trunk/freenet/src/freenet/support/io/CannotCreateFromFieldSetException.java
===================================================================
--- trunk/freenet/src/freenet/support/io/CannotCreateFromFieldSetException.java
2006-07-14 19:04:15 UTC (rev 9607)
+++ trunk/freenet/src/freenet/support/io/CannotCreateFromFieldSetException.java
2006-07-14 19:52:30 UTC (rev 9608)
@@ -0,0 +1,13 @@
+package freenet.support.io;
+
+public class CannotCreateFromFieldSetException extends Exception {
+
+ public CannotCreateFromFieldSetException(String msg) {
+ super(msg);
+ }
+
+ public CannotCreateFromFieldSetException(String msg,
NumberFormatException e) {
+ super(msg+" : "+e, e);
+ }
+
+}
Modified: trunk/freenet/src/freenet/support/io/FileBucket.java
===================================================================
--- trunk/freenet/src/freenet/support/io/FileBucket.java 2006-07-14
19:04:15 UTC (rev 9607)
+++ trunk/freenet/src/freenet/support/io/FileBucket.java 2006-07-14
19:52:30 UTC (rev 9608)
@@ -82,6 +82,20 @@
file.deleteOnExit();
}
+ public FileBucket(SimpleFieldSet fs, PersistentFileTracker f) throws
CannotCreateFromFieldSetException {
+ String tmp = fs.get("Filename");
+ if(tmp == null) throw new CannotCreateFromFieldSetException("No
filename");
+ this.file = new File(tmp);
+ tmp = fs.get("Length");
+ if(tmp == null) throw new CannotCreateFromFieldSetException("No
length");
+ try {
+ length = Long.parseLong(tmp);
+ } catch (NumberFormatException e) {
+ throw new CannotCreateFromFieldSetException("Corrupt
length "+tmp, e);
+ }
+ f.register(file);
+ }
+
public OutputStream getOutputStream() throws IOException {
synchronized (this) {
if(readOnly)
Added: trunk/freenet/src/freenet/support/io/NullPersistentFileTracker.java
===================================================================
--- trunk/freenet/src/freenet/support/io/NullPersistentFileTracker.java
2006-07-14 19:04:15 UTC (rev 9607)
+++ trunk/freenet/src/freenet/support/io/NullPersistentFileTracker.java
2006-07-14 19:52:30 UTC (rev 9608)
@@ -0,0 +1,11 @@
+package freenet.support.io;
+
+import java.io.File;
+
+public class NullPersistentFileTracker implements PersistentFileTracker {
+
+ public void register(File file) {
+ // Do nothing
+ }
+
+}
Added: trunk/freenet/src/freenet/support/io/PersistentFileTracker.java
===================================================================
--- trunk/freenet/src/freenet/support/io/PersistentFileTracker.java
2006-07-14 19:04:15 UTC (rev 9607)
+++ trunk/freenet/src/freenet/support/io/PersistentFileTracker.java
2006-07-14 19:52:30 UTC (rev 9608)
@@ -0,0 +1,9 @@
+package freenet.support.io;
+
+import java.io.File;
+
+public interface PersistentFileTracker {
+
+ public void register(File file);
+
+}
Modified: trunk/freenet/src/freenet/support/io/PersistentTempBucketFactory.java
===================================================================
--- trunk/freenet/src/freenet/support/io/PersistentTempBucketFactory.java
2006-07-14 19:04:15 UTC (rev 9607)
+++ trunk/freenet/src/freenet/support/io/PersistentTempBucketFactory.java
2006-07-14 19:52:30 UTC (rev 9608)
@@ -20,7 +20,7 @@
* Once startup is completed, any unclaimed temp buckets which match the
* temporary file pattern will be deleted.
*/
-public class PersistentTempBucketFactory implements BucketFactory {
+public class PersistentTempBucketFactory implements BucketFactory,
PersistentFileTracker {
/** Directory containing persistent temporary files */
private final File dir;
@@ -74,6 +74,10 @@
return b;
}
+ public void register(File file) {
+ originalFiles.remove(file);
+ }
+
/**
* Called when boot-up is complete.
* Deletes any old temp files still unclaimed.
Added:
trunk/freenet/src/freenet/support/io/SerializableToFieldSetBucketUtil.java
===================================================================
--- trunk/freenet/src/freenet/support/io/SerializableToFieldSetBucketUtil.java
2006-07-14 19:04:15 UTC (rev 9607)
+++ trunk/freenet/src/freenet/support/io/SerializableToFieldSetBucketUtil.java
2006-07-14 19:52:30 UTC (rev 9608)
@@ -0,0 +1,30 @@
+package freenet.support.io;
+
+import freenet.crypt.RandomSource;
+import freenet.support.Bucket;
+import freenet.support.PaddedEphemerallyEncryptedBucket;
+import freenet.support.RandomAccessFileBucket;
+import freenet.support.SimpleFieldSet;
+
+public class SerializableToFieldSetBucketUtil {
+
+ // FIXME use something other than ResumeException???
+
+ public static Bucket create(SimpleFieldSet fs, RandomSource random,
PersistentFileTracker f) throws CannotCreateFromFieldSetException {
+ if(fs == null) return null;
+ String type = fs.get("Type");
+ if(type == null) {
+ throw new CannotCreateFromFieldSetException("No type");
+ } else if(type.equals("FileBucket")) {
+ return new FileBucket(fs, f);
+ } else if(type.equals("PaddedEphemerallyEncryptedBucket")) {
+ return new PaddedEphemerallyEncryptedBucket(fs, random,
f);
+ } else if(type.equals("NullBucket")) {
+ return new NullBucket();
+ } else if(type.equals("RandomAccessFileBucket")) {
+ return new RandomAccessFileBucket(fs, f);
+ } else
+ throw new
CannotCreateFromFieldSetException("Unrecognized type "+type);
+ }
+
+}