Author: toad
Date: 2006-01-25 23:47:20 +0000 (Wed, 25 Jan 2006)
New Revision: 7930
Added:
trunk/freenet/src/freenet/client/FetchWaiter.java
trunk/freenet/src/freenet/client/PutWaiter.java
trunk/freenet/src/freenet/client/StartableSplitfileBlock.java
trunk/freenet/src/freenet/client/async/
trunk/freenet/src/freenet/client/async/BaseClientPutter.java
trunk/freenet/src/freenet/client/async/ClientCallback.java
trunk/freenet/src/freenet/client/async/ClientGetState.java
trunk/freenet/src/freenet/client/async/ClientGetter.java
trunk/freenet/src/freenet/client/async/ClientPutState.java
trunk/freenet/src/freenet/client/async/ClientPutter.java
trunk/freenet/src/freenet/client/async/ClientRequest.java
trunk/freenet/src/freenet/client/async/ClientRequestScheduler.java
trunk/freenet/src/freenet/client/async/GetCompletionCallback.java
trunk/freenet/src/freenet/client/async/MinimalSplitfileBlock.java
trunk/freenet/src/freenet/client/async/MultiPutCompletionCallback.java
trunk/freenet/src/freenet/client/async/PutCompletionCallback.java
trunk/freenet/src/freenet/client/async/RequestScheduler.java
trunk/freenet/src/freenet/client/async/SendableGet.java
trunk/freenet/src/freenet/client/async/SendableInsert.java
trunk/freenet/src/freenet/client/async/SendableRequest.java
trunk/freenet/src/freenet/client/async/SimpleManifestPutter.java
trunk/freenet/src/freenet/client/async/SingleBlockInserter.java
trunk/freenet/src/freenet/client/async/SingleFileFetcher.java
trunk/freenet/src/freenet/client/async/SingleFileInserter.java
trunk/freenet/src/freenet/client/async/SplitFileFetcher.java
trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
trunk/freenet/src/freenet/client/async/SplitFileInserter.java
trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java
trunk/freenet/src/freenet/node/fcp/URIGeneratedMessage.java
trunk/freenet/src/freenet/support/IntNumberedItem.java
trunk/freenet/src/freenet/support/NumberedItemComparator.java
trunk/freenet/src/freenet/support/RandomGrabArray.java
trunk/freenet/src/freenet/support/RandomGrabArrayItem.java
trunk/freenet/src/freenet/support/RandomGrabArrayWithInt.java
trunk/freenet/src/freenet/support/SimpleIntNumberedItemComparator.java
trunk/freenet/src/freenet/support/SortedVectorByNumber.java
Removed:
trunk/freenet/src/freenet/client/BlockFetcher.java
trunk/freenet/src/freenet/client/BlockInserter.java
trunk/freenet/src/freenet/client/Fetcher.java
trunk/freenet/src/freenet/client/FileInserter.java
trunk/freenet/src/freenet/client/InsertSegment.java
trunk/freenet/src/freenet/client/MultiFileInserter.java
trunk/freenet/src/freenet/client/RetryTracker.java
trunk/freenet/src/freenet/client/RetryTrackerCallback.java
trunk/freenet/src/freenet/client/Segment.java
trunk/freenet/src/freenet/client/SplitFetcher.java
trunk/freenet/src/freenet/client/SplitInserter.java
trunk/freenet/src/freenet/client/StdSplitfileBlock.java
trunk/freenet/src/freenet/client/async/BaseClientPutter.java
trunk/freenet/src/freenet/client/async/ClientCallback.java
trunk/freenet/src/freenet/client/async/ClientGetState.java
trunk/freenet/src/freenet/client/async/ClientGetter.java
trunk/freenet/src/freenet/client/async/ClientPutState.java
trunk/freenet/src/freenet/client/async/ClientPutter.java
trunk/freenet/src/freenet/client/async/ClientRequest.java
trunk/freenet/src/freenet/client/async/ClientRequestScheduler.java
trunk/freenet/src/freenet/client/async/GetCompletionCallback.java
trunk/freenet/src/freenet/client/async/MinimalSplitfileBlock.java
trunk/freenet/src/freenet/client/async/MultiPutCompletionCallback.java
trunk/freenet/src/freenet/client/async/PutCompletionCallback.java
trunk/freenet/src/freenet/client/async/RequestScheduler.java
trunk/freenet/src/freenet/client/async/SendableGet.java
trunk/freenet/src/freenet/client/async/SendableInsert.java
trunk/freenet/src/freenet/client/async/SendableRequest.java
trunk/freenet/src/freenet/client/async/SimpleManifestPutter.java
trunk/freenet/src/freenet/client/async/SingleBlockInserter.java
trunk/freenet/src/freenet/client/async/SingleFileFetcher.java
trunk/freenet/src/freenet/client/async/SingleFileInserter.java
trunk/freenet/src/freenet/client/async/SplitFileFetcher.java
trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
trunk/freenet/src/freenet/client/async/SplitFileInserter.java
trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java
trunk/freenet/src/freenet/node/QueuedDataRequest.java
trunk/freenet/src/freenet/node/QueuedInsertRequest.java
trunk/freenet/src/freenet/node/QueuedRequest.java
trunk/freenet/src/freenet/node/QueueingSimpleLowLevelClient.java
trunk/freenet/src/freenet/node/RequestStarterClient.java
trunk/freenet/src/freenet/node/SimpleLowLevelClient.java
Modified:
trunk/freenet/src/freenet/client/ArchiveHandler.java
trunk/freenet/src/freenet/client/ArchiveStoreContext.java
trunk/freenet/src/freenet/client/ClientMetadata.java
trunk/freenet/src/freenet/client/FECCodec.java
trunk/freenet/src/freenet/client/FailureCodeTracker.java
trunk/freenet/src/freenet/client/FetchException.java
trunk/freenet/src/freenet/client/FetcherContext.java
trunk/freenet/src/freenet/client/HighLevelSimpleClient.java
trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
trunk/freenet/src/freenet/client/InsertBlock.java
trunk/freenet/src/freenet/client/InserterContext.java
trunk/freenet/src/freenet/client/InserterException.java
trunk/freenet/src/freenet/client/Metadata.java
trunk/freenet/src/freenet/client/SplitfileBlock.java
trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
trunk/freenet/src/freenet/keys/Key.java
trunk/freenet/src/freenet/node/LowLevelGetException.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java
trunk/freenet/src/freenet/node/RequestStarter.java
trunk/freenet/src/freenet/node/TextModeClientInterface.java
trunk/freenet/src/freenet/node/Version.java
trunk/freenet/src/freenet/node/fcp/ClientGet.java
trunk/freenet/src/freenet/node/fcp/ClientPut.java
trunk/freenet/src/freenet/node/fcp/FCPConnectionHandler.java
trunk/freenet/src/freenet/node/fcp/PutFailedMessage.java
trunk/freenet/src/freenet/support/NumberedRecentItems.java
trunk/freenet/src/freenet/support/UpdatableSortedLinkedList.java
Log:
381: (mandatory)
Merge client-async into trunk.
Practical results:
- More efficient with large numbers of parallel requests (much less threads)
- Future possibility of restartable queued downloads
- Some changes to FCP
Modified: trunk/freenet/src/freenet/client/ArchiveHandler.java
===================================================================
--- trunk/freenet/src/freenet/client/ArchiveHandler.java 2006-01-25
23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/ArchiveHandler.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -6,7 +6,7 @@
* The public face (to Fetcher, for example) of ArchiveStoreContext.
* Just has methods for fetching stuff.
*/
-interface ArchiveHandler {
+public interface ArchiveHandler {
/**
* Get the metadata for this ZIP manifest, as a Bucket.
@@ -14,7 +14,7 @@
* @throws MetadataParseException If there was an error parsing
intermediary metadata.
*/
public abstract Bucket getMetadata(ArchiveContext archiveContext,
- FetcherContext fetchContext, ClientMetadata dm, int
recursionLevel,
+ ClientMetadata dm, int recursionLevel,
boolean dontEnterImplicitArchives)
throws ArchiveFailureException, ArchiveRestartException,
MetadataParseException, FetchException;
@@ -30,10 +30,15 @@
* @throws MetadataParseException
*/
public abstract Bucket get(String internalName,
- ArchiveContext archiveContext, FetcherContext
fetchContext,
+ ArchiveContext archiveContext,
ClientMetadata dm, int recursionLevel,
boolean dontEnterImplicitArchives)
throws ArchiveFailureException, ArchiveRestartException,
MetadataParseException, FetchException;
+ /**
+ * Get the archive type.
+ */
+ public abstract short getArchiveType();
+
}
\ No newline at end of file
Modified: trunk/freenet/src/freenet/client/ArchiveStoreContext.java
===================================================================
--- trunk/freenet/src/freenet/client/ArchiveStoreContext.java 2006-01-25
23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/ArchiveStoreContext.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -12,8 +12,11 @@
* subject to the above.
*
* Always take the lock on ArchiveStoreContext before the lock on
ArchiveManager, NOT the other way around.
+ *
+ * Not normally to be used directly by external packages, but public for
+ * ArchiveManager.extractToCache. FIXME.
*/
-class ArchiveStoreContext implements ArchiveHandler {
+public class ArchiveStoreContext implements ArchiveHandler {
private ArchiveManager manager;
private FreenetURI key;
@@ -34,42 +37,29 @@
* Get the metadata for a given archive.
* @return A Bucket containing the metadata, in binary format, for the
archive.
*/
- public Bucket getMetadata(ArchiveContext archiveContext, FetcherContext
fetchContext, ClientMetadata dm, int recursionLevel,
+ public Bucket getMetadata(ArchiveContext archiveContext, ClientMetadata
dm, int recursionLevel,
boolean dontEnterImplicitArchives) throws
ArchiveFailureException, ArchiveRestartException, MetadataParseException,
FetchException {
- return get(".metadata", archiveContext, fetchContext, dm,
recursionLevel, dontEnterImplicitArchives);
+ return get(".metadata", archiveContext, dm, recursionLevel,
dontEnterImplicitArchives);
}
/**
* Fetch a file in an archive. Will check the cache first, then fetch
the archive if
* necessary.
*/
- public Bucket get(String internalName, ArchiveContext archiveContext,
FetcherContext fetchContext, ClientMetadata dm, int recursionLevel,
+ public Bucket get(String internalName, ArchiveContext archiveContext,
ClientMetadata dm, int recursionLevel,
boolean dontEnterImplicitArchives) throws
ArchiveFailureException, ArchiveRestartException, MetadataParseException,
FetchException {
// Do loop detection on the archive that we are about to fetch.
archiveContext.doLoopDetection(key);
Bucket data;
-
+
// Fetch from cache
if((data = manager.getCached(key, internalName)) != null) {
return data;
}
- synchronized(this) {
- // Fetch from cache
- if((data = manager.getCached(key, internalName)) !=
null) {
- return data;
- }
-
- // Not in cache
-
- if(fetchContext == null) return null;
- Fetcher fetcher = new Fetcher(key, fetchContext,
archiveContext);
- FetchResult result = fetcher.realRun(dm,
recursionLevel, key, dontEnterImplicitArchives, fetchContext.localRequestOnly);
- manager.extractToCache(key, archiveType, result.data,
archiveContext, this);
- return manager.getCached(key, internalName);
- }
+ return null;
}
// Archive size
@@ -130,5 +120,9 @@
myItems.remove(item);
}
}
+
+ public short getArchiveType() {
+ return archiveType;
+ }
}
Deleted: trunk/freenet/src/freenet/client/BlockFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/BlockFetcher.java 2006-01-25 23:39:26 UTC
(rev 7929)
+++ trunk/freenet/src/freenet/client/BlockFetcher.java 2006-01-25 23:47:20 UTC
(rev 7930)
@@ -1,122 +0,0 @@
-/**
- *
- */
-package freenet.client;
-
-import freenet.keys.FreenetURI;
-import freenet.support.Bucket;
-import freenet.support.Logger;
-
-public class BlockFetcher extends StdSplitfileBlock {
-
- private final Segment segment;
- final FreenetURI uri;
- final boolean dontEnterImplicitArchives;
- int completedTries;
- boolean actuallyFetched;
-
- public BlockFetcher(Segment segment, RetryTracker tracker, FreenetURI
freenetURI, int index, boolean dontEnterImplicitArchives) {
- super(tracker, index, null);
- this.segment = segment;
- uri = freenetURI;
- completedTries = 0;
- fetchedData = null;
- actuallyFetched = false;
- this.dontEnterImplicitArchives = dontEnterImplicitArchives;
- }
-
- public String getName() {
- return "BlockFetcher for "+getNumber();
- }
-
- public void run() {
- Logger.minor(this, "Running: "+this);
- // Already added to runningFetches.
- // But need to make sure we are removed when we exit.
- try {
- realRun();
- } catch (Throwable t) {
- fatalError(t, FetchException.INTERNAL_ERROR);
- } finally {
- completedTries++;
- }
- }
-
- public String toString() {
- return super.toString()+" tries="+completedTries+" uri="+uri;
- }
-
- private void realRun() {
- // Do the fetch
- Fetcher f = new Fetcher(uri, this.segment.blockFetchContext);
- try {
- FetchResult fr = f.realRun(new ClientMetadata(),
segment.recursionLevel, uri,
- (!this.segment.nonFullBlocksAllowed) ||
dontEnterImplicitArchives, segment.blockFetchContext.localRequestOnly ||
completedTries == 0);
- actuallyFetched = true;
- fetchedData = fr.data;
- Logger.minor(this, "Fetched "+fetchedData.size()+"
bytes on "+this);
- tracker.success(this);
- } catch (MetadataParseException e) {
- fatalError(e, FetchException.INVALID_METADATA);
- } catch (FetchException e) {
- int code = e.getMode();
- boolean isFatal = e.isFatal();
- if(isFatal)
- fatalError(e, code);
- else
- nonfatalError(e, code);
- } catch (ArchiveFailureException e) {
- fatalError(e, FetchException.ARCHIVE_FAILURE);
- } catch (ArchiveRestartException e) {
- Logger.error(this, "Got an ArchiveRestartException in a
splitfile - WTF?");
- fatalError(e, FetchException.ARCHIVE_FAILURE);
- }
- }
-
- private void fatalError(Throwable e, int code) {
- Logger.error(this, "Giving up on block: "+this+": "+e, e);
- tracker.fatalError(this, code);
- }
-
- private void nonfatalError(Exception e, int code) {
- Logger.minor(this, "Non-fatal error on "+this+": "+e);
- tracker.nonfatalError(this, code);
- }
-
- public boolean succeeded() {
- return fetchedData != null;
- }
-
- /**
- * Queue a healing block for insert.
- * Will be implemented using the download manager.
- * FIXME: implement!
- */
- public void queueHeal() {
- // TODO Auto-generated method stub
-
- }
-
- public void kill() {
- // Do nothing, for now
- }
-
- public FreenetURI getURI() {
- return uri;
- }
-
- public void setData(Bucket data) {
- actuallyFetched = false;
- super.setData(data);
- }
-
- protected void checkStartable() {
- if(fetchedData != null) {
- throw new IllegalStateException("Already have data");
- }
- }
-
- public int getRetryCount() {
- return completedTries;
- }
-}
\ No newline at end of file
Deleted: trunk/freenet/src/freenet/client/BlockInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/BlockInserter.java 2006-01-25 23:39:26 UTC
(rev 7929)
+++ trunk/freenet/src/freenet/client/BlockInserter.java 2006-01-25 23:47:20 UTC
(rev 7930)
@@ -1,150 +0,0 @@
-package freenet.client;
-
-import freenet.client.events.BlockInsertErrorEvent;
-import freenet.keys.FreenetURI;
-import freenet.support.Bucket;
-import freenet.support.Logger;
-
-/**
- * Inserts a single splitfile block.
- */
-public class BlockInserter extends StdSplitfileBlock implements Runnable {
-
- private boolean succeeded;
- private int completedTries;
- private final InserterContext ctx;
- private final InsertBlock block;
- private FreenetURI uri;
- private final boolean getCHKOnly;
- /** RNF count. We can count many consecutive RNFs as success. */
- private int rnfs;
-
- /**
- * Create a BlockInserter.
- * @param bucket The data to insert, or null if it will be filled in
later.
- * @param num The block number in the splitfile.
- */
- public BlockInserter(Bucket bucket, int num, RetryTracker tracker,
InserterContext ctx, boolean getCHKOnly) {
- super(tracker, num, bucket);
- succeeded = false;
- this.ctx = ctx;
- block = new InsertBlock(bucket, null, FreenetURI.EMPTY_CHK_URI);
- this.getCHKOnly = getCHKOnly;
- Logger.minor(this, "Created "+this);
- }
-
- public synchronized void setData(Bucket data) {
- if(this.fetchedData != null) throw new
IllegalArgumentException("Cannot set data when already have data");
- block.data = data;
- super.setData(data);
- }
-
- public void kill() {
- // Do nothing, for now.
- }
-
- public String toString() {
- return super.toString()+" succeeded="+succeeded+"
tries="+completedTries+" uri="+uri;
- }
-
- public FreenetURI getURI() {
- return uri;
- }
-
- public String getName() {
- return "BlockInserter for "+this.getNumber();
- }
-
- public void run() {
- try {
- Logger.minor(this, "Running "+this);
- if(fetchedData == null)
- throw new NullPointerException();
- realRun();
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t+" on "+this, t);
- fatalError(t, InserterException.INTERNAL_ERROR);
- } finally {
- completedTries++;
- }
- }
-
- private void realRun() {
- FileInserter inserter = new FileInserter(ctx);
- try {
- if(uri == null && !getCHKOnly)
- uri = inserter.run(block, false, true, true,
null);
- uri = inserter.run(block, false, getCHKOnly, true,
null);
- succeeded = true;
- tracker.success(this);
- } catch (InserterException e) {
- int mode = e.getMode();
- switch(mode) {
- case InserterException.ROUTE_NOT_FOUND:
- // N consecutive RNFs = success
- if(ctx.consecutiveRNFsCountAsSuccess > 0) {
- rnfs++;
- if(rnfs >=
ctx.consecutiveRNFsCountAsSuccess) {
- succeeded = true;
- tracker.success(this);
- return;
- }
- }
- nonfatalError(e, mode);
- return;
- case InserterException.REJECTED_OVERLOAD:
- case InserterException.ROUTE_REALLY_NOT_FOUND:
- rnfs = 0;
- nonfatalError(e, mode);
- return;
- case InserterException.INTERNAL_ERROR:
- case InserterException.BUCKET_ERROR:
- fatalError(e, mode);
- return;
- case InserterException.FATAL_ERRORS_IN_BLOCKS:
- case InserterException.TOO_MANY_RETRIES_IN_BLOCKS:
- // Huh?
- Logger.error(this, "Got error inserting blocks
("+e.getMessage()+") while inserting a block - WTF?");
- fatalError(e, InserterException.INTERNAL_ERROR);
- return;
- case InserterException.INVALID_URI:
- Logger.error(this, "Got invalid URI error but
URI was CHK@ in block insert");
- fatalError(e, InserterException.INTERNAL_ERROR);
- return;
- default:
- rnfs = 0;
- Logger.error(this, "Unknown insert error
"+mode+" while inserting a block");
- fatalError(e, InserterException.INTERNAL_ERROR);
- return;
- }
- // FIXME add more cases as we create them
- }
-
- }
-
- private void fatalError(InserterException e, int code) {
- Logger.normal(this, "Giving up on block: "+this+": "+e);
- tracker.fatalError(this, code);
- ctx.eventProducer.produceEvent(new BlockInsertErrorEvent(e,
uri, completedTries));
- }
-
- private void fatalError(Throwable t, int code) {
- // Don't need to include uri
- fatalError(new InserterException(code, t, null), code);
- }
-
- private void nonfatalError(InserterException e, int code) {
- Logger.minor(this, "Non-fatal error on "+this+": "+e);
- tracker.nonfatalError(this, code);
- ctx.eventProducer.produceEvent(new BlockInsertErrorEvent(e,
uri, completedTries));
- }
-
- protected void checkStartable() {
- if(succeeded)
- throw new IllegalStateException("Already inserted
block");
- }
-
- public int getRetryCount() {
- return completedTries;
- }
-}
Modified: trunk/freenet/src/freenet/client/ClientMetadata.java
===================================================================
--- trunk/freenet/src/freenet/client/ClientMetadata.java 2006-01-25
23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/ClientMetadata.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -13,7 +13,7 @@
}
/** Create an empty ClientMetadata instance */
- ClientMetadata() {
+ public ClientMetadata() {
mimeType = null;
}
Modified: trunk/freenet/src/freenet/client/FECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/FECCodec.java 2006-01-25 23:39:26 UTC
(rev 7929)
+++ trunk/freenet/src/freenet/client/FECCodec.java 2006-01-25 23:47:20 UTC
(rev 7930)
@@ -2,6 +2,7 @@
import java.io.IOException;
+import freenet.support.Bucket;
import freenet.support.BucketFactory;
/**
@@ -11,7 +12,7 @@
* @author root
*
*/
-abstract class FECCodec {
+public abstract class FECCodec {
/**
* Get a codec where we know both the number of data blocks and the
number
@@ -67,6 +68,17 @@
public abstract void encode(SplitfileBlock[] dataBlocks,
SplitfileBlock[] checkBlocks, int blockLength, BucketFactory bucketFactory)
throws IOException;
/**
+ * Encode all missing *check* blocks.
+ * Requires that all the data blocks be present.
+ * @param dataBlocks The data blocks.
+ * @param checkBlocks The check blocks.
+ * @param blockLength The block length in bytes.
+ * @param bf The BucketFactory to use to generate buckets.
+ * @throws IOException If there is an error in decoding caused by an
I/O error (usually involving buckets).
+ */
+ public abstract void encode(Bucket[] dataBlocks, Bucket[] checkBlocks,
int blockLength, BucketFactory bucketFactory) throws IOException;
+
+ /**
* How many check blocks?
*/
public abstract int countCheckBlocks();
Modified: trunk/freenet/src/freenet/client/FailureCodeTracker.java
===================================================================
--- trunk/freenet/src/freenet/client/FailureCodeTracker.java 2006-01-25
23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/FailureCodeTracker.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -106,5 +106,30 @@
public synchronized int getFirstCode() {
return ((Integer) map.keySet().toArray()[0]).intValue();
}
+
+ public synchronized boolean isFatal(boolean insert) {
+ Iterator i = map.keySet().iterator();
+ while(i.hasNext()) {
+ Integer code = (Integer) i.next();
+ if(((Item)map.get(code)).x == 0) continue;
+ if(insert) {
+ if(InserterException.isFatal(code.intValue()))
return true;
+ } else {
+ if(FetchException.isFatal(code.intValue()))
return true;
+ }
+ }
+ return false;
+ }
+
+ public void merge(InserterException e) {
+ if(!insert) throw new IllegalArgumentException("This is not an
insert yet merge("+e+") called!");
+ if(e.errorCodes != null)
+ merge(e.errorCodes);
+ inc(e.getMode());
+ }
+
+ public boolean isEmpty() {
+ return map.isEmpty();
+ }
}
Modified: trunk/freenet/src/freenet/client/FetchException.java
===================================================================
--- trunk/freenet/src/freenet/client/FetchException.java 2006-01-25
23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/FetchException.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -40,7 +40,7 @@
}
public FetchException(ArchiveFailureException e) {
- super(getMessage(INVALID_METADATA)+": "+e.getMessage());
+ super(getMessage(ARCHIVE_FAILURE)+": "+e.getMessage());
extraMessage = e.getMessage();
mode = ARCHIVE_FAILURE;
errorCodes = null;
@@ -48,6 +48,14 @@
Logger.minor(this, "FetchException("+getMessage(mode)+"):
"+e,e);
}
+ public FetchException(ArchiveRestartException e) {
+ super(getMessage(ARCHIVE_RESTART)+": "+e.getMessage());
+ extraMessage = e.getMessage();
+ mode = ARCHIVE_FAILURE;
+ errorCodes = null;
+ initCause(e);
+ Logger.minor(this, "FetchException("+getMessage(mode)+"):
"+e,e); }
+
public FetchException(int mode, Throwable t) {
super(getMessage(mode)+": "+t.getMessage());
extraMessage = t.getMessage();
@@ -126,6 +134,8 @@
return "No default document; give more metastrings in
URI";
case CANCELLED:
return "Cancelled by caller";
+ case ARCHIVE_RESTART:
+ return "Archive restart";
default:
return "Unknown fetch error code: "+mode;
}
@@ -183,9 +193,15 @@
public static final int NOT_ENOUGH_METASTRINGS = 24;
/** Explicitly cancelled */
public static final int CANCELLED = 25;
+ /** Archive restart */
+ public static final int ARCHIVE_RESTART = 26;
/** Is an error fatal i.e. is there no point retrying? */
public boolean isFatal() {
+ return isFatal(mode);
+ }
+
+ public static boolean isFatal(int mode) {
switch(mode) {
// Problems with the data as inserted. No point retrying.
case FetchException.ARCHIVE_FAILURE:
@@ -200,6 +216,7 @@
case FetchException.TOO_MUCH_RECURSION:
case FetchException.UNKNOWN_METADATA:
case FetchException.UNKNOWN_SPLITFILE_METADATA:
+ case FetchException.TOO_BIG:
return true;
// Low level errors, can be retried
@@ -218,12 +235,14 @@
// Fatal, because there are internal retries
return true;
+ // Wierd ones
case FetchException.CANCELLED:
+ case FetchException.ARCHIVE_RESTART:
// Fatal
return true;
default:
- Logger.error(this, "Do not know if error code is fatal:
"+getMessage(mode));
+ Logger.error(FetchException.class, "Do not know if
error code is fatal: "+getMessage(mode));
return false; // assume it isn't
}
}
Copied: trunk/freenet/src/freenet/client/FetchWaiter.java (from rev 7929,
branches/async-client/src/freenet/client/FetchWaiter.java)
Deleted: trunk/freenet/src/freenet/client/Fetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/Fetcher.java 2006-01-25 23:39:26 UTC
(rev 7929)
+++ trunk/freenet/src/freenet/client/Fetcher.java 2006-01-25 23:47:20 UTC
(rev 7930)
@@ -1,401 +0,0 @@
-package freenet.client;
-
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.util.LinkedList;
-
-import freenet.client.events.DecodedBlockEvent;
-import freenet.client.events.FetchedMetadataEvent;
-import freenet.client.events.GotBlockEvent;
-import freenet.keys.ClientKey;
-import freenet.keys.ClientKeyBlock;
-import freenet.keys.FreenetURI;
-import freenet.keys.KeyDecodeException;
-import freenet.node.LowLevelGetException;
-import freenet.support.Bucket;
-import freenet.support.BucketTools;
-import freenet.support.Logger;
-import freenet.support.compress.CompressionOutputSizeException;
-import freenet.support.compress.Compressor;
-
-/** Class that does the actual fetching. Does not have to have a user friendly
- * interface!
- */
-public class Fetcher {
-
- /** The original URI to be fetched. */
- final FreenetURI origURI;
- /** The settings for the fetch e.g. max file size */
- final FetcherContext ctx;
- /** The archive context object to be passed down the entire request.
This is
- * recreated if we get an ArchiveRestartException. It does loop
detection, partly
- * in order to prevent rare deadlocks.
- */
- ArchiveContext archiveContext;
-
- /**
- * Local-only constructor, with ArchiveContext, for recursion via e.g.
archives.
- */
- public Fetcher(FreenetURI uri, FetcherContext fctx, ArchiveContext
actx) {
- if(uri == null) throw new NullPointerException();
- origURI = uri;
- ctx = fctx;
- archiveContext = actx;
- }
-
- /**
- * Create a Fetcher. Public constructor, for when starting a new
request chain.
- * @param uri The key to fetch.
- * @param ctx The settings for the fetch.
- */
- public Fetcher(FreenetURI uri, FetcherContext ctx) {
- this(uri, ctx, new ArchiveContext());
- }
-
- /**
- * Fetch the key. Called by clients.
- * @return The key requested's data and client metadata.
- * @throws FetchException If we cannot fetch the key for some reason.
Various
- * other exceptions are used internally; they are converted to a
FetchException
- * by this driver routine.
- */
- public FetchResult run() throws FetchException {
- FailureCodeTracker tracker = new FailureCodeTracker(false);
- FetchException lastThrown = null;
- for(int j=0;j<ctx.maxNonSplitfileRetries+1;j++) {
- try {
- return runOnce();
- } catch (FetchException e) {
- lastThrown = e;
- tracker.merge(e);
- if(e.isFatal()) throw e;
- Logger.normal(this, "Possibly retrying "+this+"
despite "+e, e);
- continue;
- }
- }
- if(tracker.totalCount() == 1)
- throw lastThrown;
- else {
- if(tracker.isOneCodeOnly())
- throw new
FetchException(tracker.getFirstCode());
- throw new
FetchException(FetchException.SPLITFILE_ERROR, tracker);
- }
- }
-
- public FetchResult runOnce() throws FetchException {
- for(int i=0;i<ctx.maxArchiveRestarts;i++) {
- try {
- if(ctx.isCancelled()) throw new
FetchException(FetchException.CANCELLED);
- ClientMetadata dm = new ClientMetadata();
- ClientKey key;
- try {
- key = ClientKey.getBaseKey(origURI);
- } catch (MalformedURLException e2) {
- throw new
FetchException(FetchException.INVALID_URI, "Invalid URI: "+origURI);
- }
- LinkedList metaStrings =
origURI.listMetaStrings();
-
- FetchResult fr = realRun(dm, 0, key,
metaStrings, ctx.dontEnterImplicitArchives, ctx.localRequestOnly);
-
- if(metaStrings.isEmpty()) return fr;
- // Still got some meta-strings
- throw new
FetchException(FetchException.HAS_MORE_METASTRINGS);
-
- } catch (ArchiveRestartException e) {
- archiveContext = new ArchiveContext();
- continue;
- } catch (MetadataParseException e) {
- throw new FetchException(e);
- } catch (ArchiveFailureException e) {
-
if(e.getMessage().equals(ArchiveFailureException.TOO_MANY_LEVELS))
- throw new
FetchException(FetchException.TOO_DEEP_ARCHIVE_RECURSION);
- throw new FetchException(e);
- }
- }
- throw new
FetchException(FetchException.TOO_MANY_ARCHIVE_RESTARTS);
- }
-
- FetchResult realRun(ClientMetadata dm, int recursionLevel, FreenetURI
uri, boolean dontEnterImplicitArchives, boolean localOnly)
- throws FetchException, MetadataParseException, ArchiveFailureException,
ArchiveRestartException {
- ClientKey key;
- try {
- key = ClientKey.getBaseKey(origURI);
- } catch (MalformedURLException e2) {
- throw new FetchException(FetchException.INVALID_URI,
"Invalid URI: "+origURI);
- }
- LinkedList metaStrings = origURI.listMetaStrings();
-
- return realRun(dm, recursionLevel, key, metaStrings,
dontEnterImplicitArchives, localOnly);
- }
-
- /**
- * Fetch a key, within an overall fetch process. Called by self in
recursion, and
- * called by driver function @see run() .
- * @param dm The client metadata object to accumulate client metadata
in.
- * @param recursionLevel The recursion level. Incremented every time we
enter
- * realRun(). If it goes above a certain limit, we throw a
FetchException.
- * @param uri The URI to fetch.
- * @return The data, complete with client metadata.
- * @throws FetchException If we could not fetch the data.
- * @throws MetadataParseException If we could not parse the metadata.
- * @throws ArchiveFailureException If we could not extract data from an
archive.
- * @throws ArchiveRestartException
- */
- FetchResult realRun(ClientMetadata dm, int recursionLevel, ClientKey
key, LinkedList metaStrings, boolean dontEnterImplicitArchives, boolean
localOnly)
- throws FetchException, MetadataParseException, ArchiveFailureException,
ArchiveRestartException {
- Logger.minor(this, "Running fetch for: "+key);
- if(ctx.isCancelled()) throw new
FetchException(FetchException.CANCELLED);
- recursionLevel++;
- if(recursionLevel > ctx.maxRecursionLevel)
- throw new
FetchException(FetchException.TOO_MUCH_RECURSION, ""+recursionLevel+" should be
< "+ctx.maxRecursionLevel);
-
- // Do the fetch
- ClientKeyBlock block;
- try {
- block = ctx.client.getKey(key, localOnly,
ctx.starterClient, ctx.cacheLocalRequests, ctx.ignoreStore);
- if(ctx.isCancelled()) throw new
FetchException(FetchException.CANCELLED);
- } catch (LowLevelGetException e) {
- switch(e.code) {
- case LowLevelGetException.DATA_NOT_FOUND:
- throw new
FetchException(FetchException.DATA_NOT_FOUND);
- case LowLevelGetException.DATA_NOT_FOUND_IN_STORE:
- throw new
FetchException(FetchException.DATA_NOT_FOUND);
- case LowLevelGetException.DECODE_FAILED:
- throw new
FetchException(FetchException.BLOCK_DECODE_ERROR);
- case LowLevelGetException.INTERNAL_ERROR:
- throw new
FetchException(FetchException.INTERNAL_ERROR);
- case LowLevelGetException.REJECTED_OVERLOAD:
- throw new
FetchException(FetchException.REJECTED_OVERLOAD);
- case LowLevelGetException.ROUTE_NOT_FOUND:
- throw new
FetchException(FetchException.ROUTE_NOT_FOUND);
- case LowLevelGetException.TRANSFER_FAILED:
- throw new
FetchException(FetchException.TRANSFER_FAILED);
- case LowLevelGetException.VERIFY_FAILED:
- throw new
FetchException(FetchException.BLOCK_DECODE_ERROR);
- default:
- Logger.error(this, "Unknown
LowLevelGetException code: "+e.code);
- throw new
FetchException(FetchException.INTERNAL_ERROR);
- }
- }
-
- ctx.eventProducer.produceEvent(new GotBlockEvent(key));
-
- Bucket data;
- try {
- data = block.decode(ctx.bucketFactory, (int)
(Math.min(ctx.maxTempLength, Integer.MAX_VALUE)));
- } catch (KeyDecodeException e1) {
- throw new
FetchException(FetchException.BLOCK_DECODE_ERROR, e1.getMessage());
- } catch (IOException e) {
- Logger.error(this, "Could not capture data - disk
full?: "+e, e);
- throw new FetchException(FetchException.BUCKET_ERROR,
e);
- }
-
- ctx.eventProducer.produceEvent(new DecodedBlockEvent(key));
-
- if(!block.isMetadata()) {
- // Just return the data
- return new FetchResult(dm, data);
- }
-
- // Otherwise we need to parse the metadata
-
- if(data.size() > ctx.maxMetadataSize)
- throw new
FetchException(FetchException.TOO_BIG_METADATA);
- Metadata metadata;
- try {
- metadata =
Metadata.construct(BucketTools.toByteArray(data));
- } catch (IOException e) {
- throw new FetchException(FetchException.BUCKET_ERROR,
e);
- }
-
- ctx.eventProducer.produceEvent(new FetchedMetadataEvent());
-
- return runMetadata(dm, recursionLevel, key, metaStrings,
metadata, null, key.getURI(), dontEnterImplicitArchives, localOnly);
- }
-
- /**
- * Fetch data, from metadata.
- * @param recursionLevel The recursion level, from above. Not
incremented here, as we will
- * go through realRun() if the key changes, so the number of passes
here is severely limited.
- * @param key The key being fetched.
- * @param metaStrings List of unused meta strings (to be used by
manifests).
- * @param metadata The parsed metadata to process.
- * @param container The container in which this metadata is found.
- * @throws MetadataParseException If we could not parse metadata from a
sub-document. Will be
- * converted to a FetchException above.
- * @throws ArchiveFailureException If extracting data from an archive
failed.
- * @throws FetchException If the fetch failed for some reason.
- * @throws ArchiveRestartException
- */
- private FetchResult runMetadata(ClientMetadata dm, int recursionLevel,
ClientKey key, LinkedList metaStrings,
- Metadata metadata, ArchiveHandler container, FreenetURI
thisKey, boolean dontEnterImplicitArchives, boolean localOnly)
- throws MetadataParseException, FetchException, ArchiveFailureException,
ArchiveRestartException {
-
- if(ctx.isCancelled()) throw new
FetchException(FetchException.CANCELLED);
- if(metadata.isSimpleManifest()) {
- String name;
- if(metaStrings.isEmpty())
- name = null;
- else
- name = (String) metaStrings.removeFirst();
- // Since metadata is a document, we just replace
metadata here
- if(name == null) {
- metadata = metadata.getDefaultDocument();
- if(metadata == null)
- throw new
FetchException(FetchException.NOT_ENOUGH_METASTRINGS);
- } else {
- metadata = metadata.getDocument(name);
- thisKey = thisKey.pushMetaString(name);
- if(metadata == null)
- throw new
FetchException(FetchException.NOT_IN_ARCHIVE);
- }
- return runMetadata(dm, recursionLevel, key,
metaStrings, metadata, container, thisKey, dontEnterImplicitArchives,
localOnly);
- } else if(metadata.isArchiveManifest()) {
- container = ctx.archiveManager.makeHandler(thisKey,
metadata.getArchiveType(), false);
- Bucket metadataBucket =
container.getMetadata(archiveContext, ctx, dm, recursionLevel, true);
- try {
- metadata = Metadata.construct(metadataBucket);
- } catch (IOException e) {
- throw new
FetchException(FetchException.BUCKET_ERROR);
- }
- return runMetadata(dm, recursionLevel+1, key,
metaStrings, metadata, container, thisKey, dontEnterImplicitArchives,
localOnly);
- } else if(metadata.isArchiveInternalRedirect()) {
- if(container == null)
- throw new
FetchException(FetchException.NOT_IN_ARCHIVE);
- else {
- /* Implicit archive handling:
- * Sooner or later we reach a
SimpleFileRedirect to data, a Splitfile to data,
- * or an ArchiveInternalRedirect to data.
- *
- * In this case, if it is an archive type, if
implicit archive handling is enabled, and if
- * we have more meta-strings, we can try to
enter it.
- */
- if((!dontEnterImplicitArchives) &&
ArchiveManager.isUsableArchiveType(dm.getMIMEType()) &&
(!metaStrings.isEmpty())) {
- // Possible implicit archive inside
archive?
- container =
ctx.archiveManager.makeHandler(thisKey,
ArchiveManager.getArchiveType(dm.getMIMEType()), false);
- Bucket metadataBucket =
container.getMetadata(archiveContext, ctx, dm, recursionLevel, true);
- try {
- metadata =
Metadata.construct(metadataBucket);
- } catch (IOException e) {
- throw new
FetchException(FetchException.BUCKET_ERROR);
- }
- return runMetadata(dm,
recursionLevel+1, key, metaStrings, metadata, container, thisKey,
dontEnterImplicitArchives, localOnly);
- }
- Bucket result =
container.get(metadata.getZIPInternalName(), archiveContext, ctx, dm,
recursionLevel, true);
-
dm.mergeNoOverwrite(metadata.getClientMetadata());
- return new FetchResult(dm, result);
- }
- } else if(metadata.isMultiLevelMetadata()) {
- // Doesn't have to be a splitfile; could be from a ZIP
or a plain file.
- metadata.setSimpleRedirect();
- FetchResult res = runMetadata(dm, recursionLevel, key,
metaStrings, metadata, container, thisKey, true, localOnly);
- try {
- metadata = Metadata.construct(res.data);
- } catch (MetadataParseException e) {
- throw new
FetchException(FetchException.INVALID_METADATA, e);
- } catch (IOException e) {
- throw new
FetchException(FetchException.BUCKET_ERROR, e);
- }
- return runMetadata(dm, recursionLevel, key,
metaStrings, metadata, container, thisKey, dontEnterImplicitArchives,
localOnly);
- } else if(metadata.isSingleFileRedirect()) {
- FreenetURI uri = metadata.getSingleTarget();
- dm.mergeNoOverwrite(metadata.getClientMetadata());
- if((!dontEnterImplicitArchives) &&
ArchiveManager.isUsableArchiveType(dm.getMIMEType()) &&
(!metaStrings.isEmpty())) {
- // Is probably an implicit archive.
- ClientKey target;
- try {
- target = ClientKey.getBaseKey(uri);
- } catch (MalformedURLException e1) {
- throw new
FetchException(FetchException.INVALID_URI, "Invalid URI: "+uri);
- }
- // Probably a usable archive as-is. We may not
have to fetch it.
- container = ctx.archiveManager.makeHandler(uri,
ArchiveManager.getArchiveType(dm.getMIMEType()), true);
- if(container != null) {
- Bucket metadataBucket =
container.getMetadata(archiveContext, ctx, dm, recursionLevel, true);
- try {
- metadata =
Metadata.construct(metadataBucket);
- } catch (IOException e) {
- throw new
FetchException(FetchException.BUCKET_ERROR);
- }
- return runMetadata(dm,
recursionLevel+1, key, metaStrings, metadata, container, thisKey,
dontEnterImplicitArchives, localOnly);
- } // else just fetch it, create context later
- }
-
-
- ClientKey newKey;
- try {
- newKey = ClientKey.getBaseKey(uri);
- } catch (MalformedURLException e2) {
- throw new
FetchException(FetchException.INVALID_URI, "Invalid URI: "+uri);
- }
-
- LinkedList newMetaStrings = uri.listMetaStrings();
-
- // Move any new meta strings to beginning of our list
of remaining meta strings
- while(!newMetaStrings.isEmpty()) {
- Object o = newMetaStrings.removeLast();
- metaStrings.addFirst(o);
- }
-
- FetchResult fr = realRun(dm, recursionLevel, newKey,
metaStrings, dontEnterImplicitArchives, localOnly);
- if(metadata.isCompressed()) {
- Compressor codec =
Compressor.getCompressionAlgorithmByMetadataID(metadata.compressionCodec);
- Bucket data = fr.data;
- Bucket output;
- try {
- long maxLen = ctx.maxTempLength;
- if(maxLen < 0) maxLen = Long.MAX_VALUE;
- output = codec.decompress(data,
ctx.bucketFactory, maxLen);
- } catch (IOException e) {
- throw new
FetchException(FetchException.BUCKET_ERROR, e);
- } catch (CompressionOutputSizeException e) {
- throw new
FetchException(FetchException.TOO_BIG);
- }
- return new FetchResult(fr, output);
- }
- return fr;
- } else if(metadata.isSplitfile()) {
- // Straight data splitfile.
- // Might be used by parents for something else, in
which case they will set dontEnterImplicitArchives.
- dm.mergeNoOverwrite(metadata.getClientMetadata()); //
even splitfiles can have mime types!
- if((!dontEnterImplicitArchives) &&
ArchiveManager.isUsableArchiveType(dm.getMIMEType()) &&
(!metaStrings.isEmpty())) {
- // We know target is not metadata.
- container =
ctx.archiveManager.makeHandler(thisKey,
ArchiveManager.getArchiveType(dm.getMIMEType()), false);
- Bucket metadataBucket =
container.getMetadata(archiveContext, ctx, dm, recursionLevel, true);
- try {
- metadata =
Metadata.construct(metadataBucket);
- } catch (IOException e) {
- throw new
FetchException(FetchException.BUCKET_ERROR, e);
- }
- return runMetadata(dm, recursionLevel+1, key,
metaStrings, metadata, container, thisKey, dontEnterImplicitArchives,
localOnly);
- }
-
- FetcherContext newCtx;
- if(metadata.splitUseLengths)
- newCtx = new FetcherContext(ctx,
FetcherContext.SPLITFILE_USE_LENGTHS_MASK);
- else
- newCtx = new FetcherContext(ctx,
FetcherContext.SPLITFILE_DEFAULT_MASK);
-
- SplitFetcher sf = new SplitFetcher(metadata,
archiveContext, newCtx, recursionLevel);
- Bucket sfResult = sf.fetch(); // will throw in event of
error
- if(metadata.isCompressed()) {
- Logger.minor(this, "Is compressed:
"+metadata.compressionCodec);
- Compressor codec =
Compressor.getCompressionAlgorithmByMetadataID(metadata.compressionCodec);
- try {
- long maxLen = ctx.maxTempLength;
- if(maxLen < 0) maxLen = Long.MAX_VALUE;
- sfResult = codec.decompress(sfResult,
ctx.bucketFactory, maxLen);
- } catch (IOException e) {
- throw new
FetchException(FetchException.BUCKET_ERROR, e);
- } catch (CompressionOutputSizeException e) {
- throw new
FetchException(FetchException.TOO_BIG);
- }
- } else
- Logger.minor(this, "Not compressed
("+metadata.compressionCodec+")");
- return new FetchResult(dm, sfResult);
- } else {
- Logger.error(this, "Don't know what to do with
metadata: "+metadata);
- throw new
FetchException(FetchException.UNKNOWN_METADATA);
- }
- }
-}
Modified: trunk/freenet/src/freenet/client/FetcherContext.java
===================================================================
--- trunk/freenet/src/freenet/client/FetcherContext.java 2006-01-25
23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/FetcherContext.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -2,22 +2,20 @@
import freenet.client.events.ClientEventProducer;
import freenet.crypt.RandomSource;
-import freenet.node.RequestStarterClient;
-import freenet.node.SimpleLowLevelClient;
import freenet.support.BucketFactory;
/** Context for a Fetcher. Contains all the settings a Fetcher needs to know
about. */
public class FetcherContext implements Cloneable {
public static final int IDENTICAL_MASK = 0;
- static final int SPLITFILE_DEFAULT_BLOCK_MASK = 1;
- static final int SPLITFILE_DEFAULT_MASK = 2;
- static final int SPLITFILE_USE_LENGTHS_MASK = 3;
+ public static final int SPLITFILE_DEFAULT_BLOCK_MASK = 1;
+ public static final int SPLITFILE_DEFAULT_MASK = 2;
+ public static final int SPLITFILE_USE_LENGTHS_MASK = 3;
+ public static final int SET_RETURN_ARCHIVES = 4;
/** Low-level client to send low-level requests to. */
- final SimpleLowLevelClient client;
public long maxOutputLength;
public long maxTempLength;
- final ArchiveManager archiveManager;
+ public final ArchiveManager archiveManager;
public final BucketFactory bucketFactory;
public int maxRecursionLevel;
public int maxArchiveRestarts;
@@ -37,23 +35,24 @@
public int maxMetadataSize;
public int maxDataBlocksPerSegment;
public int maxCheckBlocksPerSegment;
- public final RequestStarterClient starterClient;
public boolean cacheLocalRequests;
private boolean cancelled;
+ /** If true, and we get a ZIP manifest, and we have no meta-strings
left, then
+ * return the manifest contents as data. */
+ public boolean returnZIPManifests;
public final boolean isCancelled() {
return cancelled;
}
- public FetcherContext(SimpleLowLevelClient client, long curMaxLength,
+ public FetcherContext(long curMaxLength,
long curMaxTempLength, int maxMetadataSize, int
maxRecursionLevel, int maxArchiveRestarts,
boolean dontEnterImplicitArchives, int
maxSplitfileThreads,
int maxSplitfileBlockRetries, int
maxNonSplitfileRetries,
boolean allowSplitfiles, boolean followRedirects,
boolean localRequestOnly,
int maxDataBlocksPerSegment, int
maxCheckBlocksPerSegment,
RandomSource random, ArchiveManager archiveManager,
BucketFactory bucketFactory,
- ClientEventProducer producer, RequestStarterClient
starter, boolean cacheLocalRequests) {
- this.client = client;
+ ClientEventProducer producer, boolean
cacheLocalRequests) {
this.maxOutputLength = curMaxLength;
this.maxTempLength = curMaxTempLength;
this.maxMetadataSize = maxMetadataSize;
@@ -73,13 +72,11 @@
this.eventProducer = producer;
this.maxDataBlocksPerSegment = maxDataBlocksPerSegment;
this.maxCheckBlocksPerSegment = maxCheckBlocksPerSegment;
- this.starterClient = starter;
this.cacheLocalRequests = cacheLocalRequests;
}
public FetcherContext(FetcherContext ctx, int maskID) {
if(maskID == IDENTICAL_MASK) {
- this.client = ctx.client;
this.maxOutputLength = ctx.maxOutputLength;
this.maxMetadataSize = ctx.maxMetadataSize;
this.maxTempLength = ctx.maxTempLength;
@@ -99,10 +96,9 @@
this.eventProducer = ctx.eventProducer;
this.maxDataBlocksPerSegment =
ctx.maxDataBlocksPerSegment;
this.maxCheckBlocksPerSegment =
ctx.maxCheckBlocksPerSegment;
- this.starterClient = ctx.starterClient;
this.cacheLocalRequests = ctx.cacheLocalRequests;
+ this.returnZIPManifests = ctx.returnZIPManifests;
} else if(maskID == SPLITFILE_DEFAULT_BLOCK_MASK) {
- this.client = ctx.client;
this.maxOutputLength = ctx.maxOutputLength;
this.maxMetadataSize = ctx.maxMetadataSize;
this.maxTempLength = ctx.maxTempLength;
@@ -122,10 +118,9 @@
this.eventProducer = ctx.eventProducer;
this.maxDataBlocksPerSegment = 0;
this.maxCheckBlocksPerSegment = 0;
- this.starterClient = ctx.starterClient;
this.cacheLocalRequests = ctx.cacheLocalRequests;
+ this.returnZIPManifests = false;
} else if(maskID == SPLITFILE_DEFAULT_MASK) {
- this.client = ctx.client;
this.maxOutputLength = ctx.maxOutputLength;
this.maxTempLength = ctx.maxTempLength;
this.maxMetadataSize = ctx.maxMetadataSize;
@@ -145,10 +140,9 @@
this.eventProducer = ctx.eventProducer;
this.maxDataBlocksPerSegment =
ctx.maxDataBlocksPerSegment;
this.maxCheckBlocksPerSegment =
ctx.maxCheckBlocksPerSegment;
- this.starterClient = ctx.starterClient;
this.cacheLocalRequests = ctx.cacheLocalRequests;
+ this.returnZIPManifests = ctx.returnZIPManifests;
} else if(maskID == SPLITFILE_USE_LENGTHS_MASK) {
- this.client = ctx.client;
this.maxOutputLength = ctx.maxOutputLength;
this.maxTempLength = ctx.maxTempLength;
this.maxMetadataSize = ctx.maxMetadataSize;
@@ -168,9 +162,32 @@
this.eventProducer = ctx.eventProducer;
this.maxDataBlocksPerSegment =
ctx.maxDataBlocksPerSegment;
this.maxCheckBlocksPerSegment =
ctx.maxCheckBlocksPerSegment;
- this.starterClient = ctx.starterClient;
this.cacheLocalRequests = ctx.cacheLocalRequests;
- } else throw new IllegalArgumentException();
+ this.returnZIPManifests = ctx.returnZIPManifests;
+ } else if (maskID == SET_RETURN_ARCHIVES) {
+ this.maxOutputLength = ctx.maxOutputLength;
+ this.maxMetadataSize = ctx.maxMetadataSize;
+ this.maxTempLength = ctx.maxTempLength;
+ this.archiveManager = ctx.archiveManager;
+ this.bucketFactory = ctx.bucketFactory;
+ this.maxRecursionLevel = ctx.maxRecursionLevel;
+ this.maxArchiveRestarts = ctx.maxArchiveRestarts;
+ this.dontEnterImplicitArchives =
ctx.dontEnterImplicitArchives;
+ this.random = ctx.random;
+ this.maxSplitfileThreads = ctx.maxSplitfileThreads;
+ this.maxSplitfileBlockRetries =
ctx.maxSplitfileBlockRetries;
+ this.maxNonSplitfileRetries =
ctx.maxNonSplitfileRetries;
+ this.allowSplitfiles = ctx.allowSplitfiles;
+ this.followRedirects = ctx.followRedirects;
+ this.localRequestOnly = ctx.localRequestOnly;
+ this.splitfileUseLengths = ctx.splitfileUseLengths;
+ this.eventProducer = ctx.eventProducer;
+ this.maxDataBlocksPerSegment =
ctx.maxDataBlocksPerSegment;
+ this.maxCheckBlocksPerSegment =
ctx.maxCheckBlocksPerSegment;
+ this.cacheLocalRequests = ctx.cacheLocalRequests;
+ this.returnZIPManifests = true;
+ }
+ else throw new IllegalArgumentException();
}
public void cancel() {
Deleted: trunk/freenet/src/freenet/client/FileInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/FileInserter.java 2006-01-25 23:39:26 UTC
(rev 7929)
+++ trunk/freenet/src/freenet/client/FileInserter.java 2006-01-25 23:47:20 UTC
(rev 7930)
@@ -1,328 +0,0 @@
-package freenet.client;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.MalformedURLException;
-
-import freenet.client.events.SimpleBlockPutEvent;
-import freenet.keys.CHKBlock;
-import freenet.keys.CHKEncodeException;
-import freenet.keys.ClientCHKBlock;
-import freenet.keys.ClientSSKBlock;
-import freenet.keys.FreenetURI;
-import freenet.keys.InsertableClientSSK;
-import freenet.keys.NodeCHK;
-import freenet.keys.SSKBlock;
-import freenet.keys.SSKEncodeException;
-import freenet.node.LowLevelPutException;
-import freenet.support.Bucket;
-import freenet.support.BucketTools;
-import freenet.support.Logger;
-import freenet.support.compress.CompressionOutputSizeException;
-import freenet.support.compress.Compressor;
-
-/**
- * Class that does high-level inserts.
- */
-public class FileInserter {
-
- InserterContext ctx;
-
- public FileInserter(InserterContext context) {
- this.ctx = context;
- }
-
- /**
- * Do an insert.
- * @param block The data to insert.
- * @param localOnly
- * @param returnMetadata If not null, return the metadata in this
bucket, rather
- * than inserting it; return the *data* CHK only. This is used by e.g.
- * MultiFileInserter, where we will aggregate the metadata elsewhere.
- * Only supported on CHKs.
- * @return The URI of the inserted data.
- * @throws InserterException
- */
- public FreenetURI run(InsertBlock block, boolean metadata, boolean
getCHKOnly, boolean noRetries, Bucket returnMetadata) throws InserterException {
- if(block.data == null)
- throw new NullPointerException();
- String type = block.desiredURI.getKeyType();
- if(type.equalsIgnoreCase("CHK")) {
-
if(!block.desiredURI.toString(false).equalsIgnoreCase("CHK@"))
- throw new
InserterException(InserterException.INVALID_URI, null);
- } else if(!(type.equalsIgnoreCase("SSK") ||
type.equalsIgnoreCase("KSK"))) {
- throw new
InserterException(InserterException.INVALID_URI, null);
- }
-
- // Insert the content.
- // If we have reason to create a metadata document, include the
client metadata.
- // Otherwise only create one (a redirect) with the client
metadata, if there is any.
-
- // First, can it fit into a single block?
-
- Bucket origData = block.data;
- Bucket data = block.data;
- int blockSize;
- int maxSourceDataSize;
- boolean isSSK = false;
- boolean dontCompress = false;
-
- long origSize = data.size();
- if(type.equals("SSK") || type.equals("KSK")) {
- blockSize = SSKBlock.DATA_LENGTH;
- isSSK = true;
- maxSourceDataSize =
ClientSSKBlock.MAX_DECOMPRESSED_DATA_LENGTH;
- if(origSize > maxSourceDataSize)
- dontCompress = true;
- // If too big to fit in an SSK, don't even try.
- } else if(block.desiredURI.getKeyType().equals("CHK")) {
- blockSize = CHKBlock.DATA_LENGTH;
- maxSourceDataSize =
ClientCHKBlock.MAX_LENGTH_BEFORE_COMPRESSION;
- } else {
- throw new
InserterException(InserterException.INVALID_URI);
- }
-
- ClientCHKBlock chk;
-
- Compressor bestCodec = null;
- Bucket bestCompressedData = null;
-
- if(origSize > blockSize && (!ctx.dontCompress) &&
(!dontCompress)) {
- // Try to compress the data.
- // Try each algorithm, starting with the fastest and
weakest.
- // Stop when run out of algorithms, or the compressed
data fits in a single block.
- int algos = Compressor.countCompressAlgorithms();
- try {
- for(int i=0;i<algos;i++) {
- Compressor comp =
Compressor.getCompressionAlgorithmByDifficulty(i);
- Bucket result;
- result = comp.compress(origData,
ctx.bf, Long.MAX_VALUE);
- if(result.size() < blockSize) {
- bestCodec = comp;
- data = result;
- if(bestCompressedData != null)
-
ctx.bf.freeBucket(bestCompressedData);
- bestCompressedData = data;
- break;
- }
- if(bestCompressedData != null &&
result.size() < bestCompressedData.size()) {
-
ctx.bf.freeBucket(bestCompressedData);
- bestCompressedData = result;
- data = result;
- bestCodec = comp;
- } else if(bestCompressedData == null &&
result.size() < data.size()) {
- bestCompressedData = result;
- bestCodec = comp;
- data = result;
- }
- }
- } catch (IOException e) {
- throw new
InserterException(InserterException.BUCKET_ERROR, e, null);
- } catch (CompressionOutputSizeException e) {
- // Impossible
- throw new Error(e);
- }
- }
-
- InsertableClientSSK isk = null;
-
- if(isSSK && data.size() <= SSKBlock.DATA_LENGTH &&
block.clientMetadata.isTrivial()) {
- short codec;
- if(bestCodec == null) {
- codec = -1;
- } else {
- codec = bestCodec.codecNumberForMetadata();
- }
- try {
- isk =
InsertableClientSSK.create(block.desiredURI);
- } catch (MalformedURLException e1) {
- throw new
InserterException(InserterException.INVALID_URI, e1, null);
- }
- ClientSSKBlock ssk;
- try {
- ssk = isk.encode(data, metadata, false, codec,
data.size(), ctx.random);
- } catch (SSKEncodeException e) {
- throw new
InserterException(InserterException.INTERNAL_ERROR, e, isk.getURI());
- } catch (IOException e) {
- throw new
InserterException(InserterException.INTERNAL_ERROR, e, isk.getURI());
- }
- return simplePutSSK(ssk, getCHKOnly, noRetries);
- }
-
- if(isSSK) {
- // Insert as CHK
- // Create metadata pointing to it (include the
clientMetadata if there is any).
- FreenetURI uri = run(new InsertBlock(block.data, null,
FreenetURI.EMPTY_CHK_URI), metadata, getCHKOnly, noRetries, null);
- Metadata m = new Metadata(Metadata.SIMPLE_REDIRECT,
uri, block.clientMetadata);
- Bucket bucket;
- try {
- bucket =
BucketTools.makeImmutableBucket(ctx.bf, m.writeToByteArray());
- } catch (IOException e) {
- throw new
InserterException(InserterException.INTERNAL_ERROR, e, isk.getURI());
- }
- return run(new InsertBlock(bucket, null,
block.desiredURI), true, getCHKOnly, noRetries, null);
- }
-
- if(data.size() <= NodeCHK.BLOCK_SIZE) {
- try {
- if(bestCodec == null) {
- chk = ClientCHKBlock.encode(data,
metadata, true, (short)-1, 0);
- } else {
- if(origSize >
ClientCHKBlock.MAX_LENGTH_BEFORE_COMPRESSION)
- throw new
IllegalArgumentException("Data too big to compress into single block, but it
does");
- chk = ClientCHKBlock.encode(data,
metadata, false, bestCodec.codecNumberForMetadata(), (int)origSize);
- }
- } catch (IOException e) {
- throw new
InserterException(InserterException.BUCKET_ERROR, e, null);
- } catch (CHKEncodeException e) {
- Logger.error(this, "Unexpected error: "+e, e);
- throw new
InserterException(InserterException.INTERNAL_ERROR, null);
- }
- return simplePutCHK(chk, block.clientMetadata,
getCHKOnly, noRetries, returnMetadata);
- }
-
- // Too big, encode to a splitfile
- SplitInserter splitInsert = new SplitInserter(data,
block.clientMetadata, bestCodec, ctx.splitfileAlgorithm, ctx, this,
NodeCHK.BLOCK_SIZE, getCHKOnly, metadata, returnMetadata);
- return splitInsert.run();
- }
-
- /**
- * Simple insert. Only complication is that it might have some client
metadata.
- * @param chk The data encoded into a single CHK.
- * @param clientMetadata The client metadata. If this is non-trivial,
we will have to
- * create a redirect document just to put the metadata in.
- * @param returnMetadata If not null, return the metadata in this
bucket, rather
- * than inserting it; return the *data* CHK only. This is used by e.g.
- * MultiFileInserter, where we will aggregate the metadata elsewhere.
- * @return The URI of the resulting CHK.
- * @throws InserterException If there was an error inserting the block.
- */
- private FreenetURI simplePutCHK(ClientCHKBlock chk, ClientMetadata
clientMetadata, boolean getCHKOnly, boolean noRetries, Bucket returnMetadata)
throws InserterException {
- LowLevelPutException le = null;
- int rnfs = 0;
- for(int i=0;i<=ctx.maxInsertRetries;i++) {
- try {
- if(!getCHKOnly)
- ctx.eventProducer.produceEvent(new
SimpleBlockPutEvent(chk.getClientKey()));
- if(!getCHKOnly)
- ctx.client.putKey(chk,
ctx.starterClient, ctx.cacheLocalRequests);
- break;
- } catch (LowLevelPutException e) {
- le = e;
- switch(le.code) {
- case
LowLevelPutException.ROUTE_REALLY_NOT_FOUND:
- case LowLevelPutException.REJECTED_OVERLOAD:
- rnfs = 0;
- }
- if(noRetries)
- break;
- if(le.code ==
LowLevelPutException.ROUTE_NOT_FOUND && ctx.consecutiveRNFsCountAsSuccess > 0) {
- rnfs++;
- if(rnfs >=
ctx.consecutiveRNFsCountAsSuccess) {
- le = null;
- break;
- }
- }
- }
- }
-
- FreenetURI uri;
-
- if(clientMetadata == null || clientMetadata.isTrivial())
- // Don't need a redirect for the metadata
- uri = chk.getClientKey().getURI();
- else {
- // Do need a redirect for the metadata
- Metadata metadata = new
Metadata(Metadata.SIMPLE_REDIRECT, chk.getClientKey().getURI(), clientMetadata);
- if(returnMetadata != null) {
- uri = chk.getClientKey().getURI();
- try {
- DataOutputStream dos = new
DataOutputStream(returnMetadata.getOutputStream());
- metadata.writeTo(dos);
- dos.close();
- } catch (IOException e) {
- throw new
InserterException(InserterException.BUCKET_ERROR);
- }
- } else {
- uri = putMetadataCHK(metadata, getCHKOnly,
noRetries);
- }
- }
-
- if(le != null)
- translateException(le, uri);
-
- return uri;
- }
-
- private FreenetURI simplePutSSK(ClientSSKBlock ssk, boolean getCHKOnly,
boolean noRetries) throws InserterException {
- LowLevelPutException le = null;
- int rnfs = 0;
- for(int i=0;i<=ctx.maxInsertRetries;i++) {
- try {
- if(!getCHKOnly)
- ctx.eventProducer.produceEvent(new
SimpleBlockPutEvent(ssk.getClientKey()));
- if(!getCHKOnly)
- ctx.client.putKey(ssk,
ctx.starterClient, ctx.cacheLocalRequests);
- break;
- } catch (LowLevelPutException e) {
- le = e;
- switch(le.code) {
- case
LowLevelPutException.ROUTE_REALLY_NOT_FOUND:
- case LowLevelPutException.REJECTED_OVERLOAD:
- rnfs = 0;
- }
- if(noRetries)
- break;
- if(le.code ==
LowLevelPutException.ROUTE_NOT_FOUND && ctx.consecutiveRNFsCountAsSuccess > 0) {
- rnfs++;
- if(rnfs >=
ctx.consecutiveRNFsCountAsSuccess) {
- le = null;
- break;
- }
- }
- if(le.code == LowLevelPutException.COLLISION)
- break;
- }
- }
-
- FreenetURI uri = ssk.getClientKey().getURI();
-
- if(le != null)
- translateException(le, uri);
-
- return uri;
- }
-
- private void translateException(LowLevelPutException e, FreenetURI uri)
throws InserterException {
- switch(e.code) {
- case LowLevelPutException.INTERNAL_ERROR:
- throw new
InserterException(InserterException.INTERNAL_ERROR, e, null);
- case LowLevelPutException.REJECTED_OVERLOAD:
- throw new
InserterException(InserterException.REJECTED_OVERLOAD, uri);
- case LowLevelPutException.ROUTE_NOT_FOUND:
- throw new
InserterException(InserterException.ROUTE_NOT_FOUND, uri);
- case LowLevelPutException.ROUTE_REALLY_NOT_FOUND:
- throw new
InserterException(InserterException.ROUTE_REALLY_NOT_FOUND, uri);
- case LowLevelPutException.COLLISION:
- throw new
InserterException(InserterException.COLLISION, uri);
- default:
- Logger.error(this, "Unknown LowLevelPutException code:
"+e.code+" on "+this);
- throw new
InserterException(InserterException.INTERNAL_ERROR, e, null);
- }
- }
-
- /** Put a metadata CHK
- * @throws InserterException If the insert fails.
- */
- private FreenetURI putMetadataCHK(Metadata metadata, boolean
getCHKOnly, boolean noRetries) throws InserterException {
- byte[] data = metadata.writeToByteArray();
- Bucket bucket;
- try {
- bucket = BucketTools.makeImmutableBucket(ctx.bf, data);
- } catch (IOException e) {
- throw new
InserterException(InserterException.BUCKET_ERROR, null);
- }
- InsertBlock block = new InsertBlock(bucket, null,
FreenetURI.EMPTY_CHK_URI);
- return run(block, true, getCHKOnly, noRetries, null);
- }
-}
Modified: trunk/freenet/src/freenet/client/HighLevelSimpleClient.java
===================================================================
--- trunk/freenet/src/freenet/client/HighLevelSimpleClient.java 2006-01-25
23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/HighLevelSimpleClient.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -4,7 +4,6 @@
import freenet.client.events.ClientEventListener;
import freenet.keys.FreenetURI;
-import freenet.node.RequestStarterClient;
public interface HighLevelSimpleClient {
Modified: trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
===================================================================
--- trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
2006-01-25 23:47:20 UTC (rev 7930)
@@ -1,20 +1,19 @@
package freenet.client;
import java.io.IOException;
-import java.net.MalformedURLException;
import java.util.HashMap;
+import freenet.client.async.ClientGetter;
+import freenet.client.async.ClientPutter;
+import freenet.client.async.SimpleManifestPutter;
import freenet.client.events.ClientEventListener;
import freenet.client.events.ClientEventProducer;
import freenet.client.events.EventLogger;
import freenet.client.events.SimpleEventProducer;
import freenet.crypt.RandomSource;
-import freenet.keys.ClientCHK;
import freenet.keys.ClientKey;
import freenet.keys.FreenetURI;
-import freenet.keys.InsertableClientSSK;
-import freenet.node.RequestStarterClient;
-import freenet.node.SimpleLowLevelClient;
+import freenet.node.Node;
import freenet.support.Bucket;
import freenet.support.BucketFactory;
import freenet.support.BucketTools;
@@ -22,17 +21,16 @@
public class HighLevelSimpleClientImpl implements HighLevelSimpleClient {
- private final SimpleLowLevelClient client;
private final ArchiveManager archiveManager;
+ private final short priorityClass;
private final BucketFactory bucketFactory;
+ private final Node node;
/** One CEP for all requests and inserts */
private final ClientEventProducer globalEventProducer;
private long curMaxLength;
private long curMaxTempLength;
private int curMaxMetadataLength;
private final RandomSource random;
- private final RequestStarterClient requestStarter;
- private final RequestStarterClient insertStarter;
/** See comments in Node */
private final boolean cacheLocalRequests;
static final int MAX_RECURSION = 10;
@@ -67,9 +65,10 @@
static final int SPLITFILE_CHECK_BLOCKS_PER_SEGMENT = 64;
- public HighLevelSimpleClientImpl(SimpleLowLevelClient client,
ArchiveManager mgr, BucketFactory bf, RandomSource r, RequestStarterClient
requestStarterClient, RequestStarterClient insertStarterClient, boolean
cacheLocalRequests) {
- this.client = client;
+ public HighLevelSimpleClientImpl(Node node, ArchiveManager mgr,
BucketFactory bf, RandomSource r, boolean cacheLocalRequests, short
priorityClass) {
+ this.node = node;
archiveManager = mgr;
+ this.priorityClass = priorityClass;
bucketFactory = bf;
random = r;
this.globalEventProducer = new SimpleEventProducer();
@@ -77,8 +76,6 @@
curMaxLength = Long.MAX_VALUE;
curMaxTempLength = Long.MAX_VALUE;
curMaxMetadataLength = 1024 * 1024;
- this.requestStarter = requestStarterClient;
- this.insertStarter = insertStarterClient;
this.cacheLocalRequests = cacheLocalRequests;
}
@@ -96,23 +93,25 @@
public FetchResult fetch(FreenetURI uri) throws FetchException {
if(uri == null) throw new NullPointerException();
FetcherContext context = getFetcherContext();
- Fetcher f = new Fetcher(uri, context);
- return f.run();
+ FetchWaiter fw = new FetchWaiter();
+ ClientGetter get = new ClientGetter(fw, node.fetchScheduler,
uri, context, priorityClass);
+ get.start();
+ return fw.waitForCompletion();
}
public FreenetURI insert(InsertBlock insert, boolean getCHKOnly) throws
InserterException {
+ return insert(insert, getCHKOnly, false);
+ }
+
+ public FreenetURI insert(InsertBlock insert, boolean getCHKOnly,
boolean isMetadata) throws InserterException {
InserterContext context = getInserterContext();
- FileInserter i = new FileInserter(context);
- return i.run(insert, false, getCHKOnly, false, null);
+ PutWaiter pw = new PutWaiter();
+ ClientPutter put = new ClientPutter(pw, insert.data,
insert.desiredURI, insert.clientMetadata,
+ context, node.putScheduler, priorityClass,
getCHKOnly, isMetadata);
+ put.start();
+ return pw.waitForCompletion();
}
- public FreenetURI insert(InsertBlock insert, boolean getCHKOnly,
boolean metadata) throws InserterException {
- InserterContext context = new InserterContext(client,
bucketFactory, random, INSERT_RETRIES, CONSECUTIVE_RNFS_ASSUME_SUCCESS,
- SPLITFILE_INSERT_THREADS,
SPLITFILE_BLOCKS_PER_SEGMENT, SPLITFILE_CHECK_BLOCKS_PER_SEGMENT,
globalEventProducer, insertStarter, cacheLocalRequests);
- FileInserter i = new FileInserter(context);
- return i.run(insert, metadata, getCHKOnly, false, null);
- }
-
public FreenetURI insertRedirect(FreenetURI insertURI, FreenetURI
targetURI) throws InserterException {
Metadata m = new Metadata(Metadata.SIMPLE_REDIRECT, targetURI,
new ClientMetadata());
Bucket b;
@@ -124,17 +123,14 @@
}
ClientKey k;
InsertBlock block = new InsertBlock(b, null, insertURI);
- InserterContext context = new InserterContext(client,
bucketFactory, random, INSERT_RETRIES, CONSECUTIVE_RNFS_ASSUME_SUCCESS,
- SPLITFILE_INSERT_THREADS,
SPLITFILE_BLOCKS_PER_SEGMENT, SPLITFILE_CHECK_BLOCKS_PER_SEGMENT,
globalEventProducer, insertStarter, cacheLocalRequests);
- FileInserter i = new FileInserter(context);
- return i.run(block, true, false, false, null);
+ return insert(block, false, true);
}
public FreenetURI insertManifest(FreenetURI insertURI, HashMap
bucketsByName, String defaultName) throws InserterException {
- InserterContext context = new InserterContext(client,
bucketFactory, random, INSERT_RETRIES, CONSECUTIVE_RNFS_ASSUME_SUCCESS,
- SPLITFILE_INSERT_THREADS,
SPLITFILE_BLOCKS_PER_SEGMENT, SPLITFILE_CHECK_BLOCKS_PER_SEGMENT,
globalEventProducer, insertStarter, cacheLocalRequests);
- MultiFileInserter mfi = new MultiFileInserter(insertURI,
bucketsByName, context, defaultName);
- return mfi.run();
+ PutWaiter pw = new PutWaiter();
+ SimpleManifestPutter putter =
+ new SimpleManifestPutter(pw, node.putScheduler,
bucketsByName, priorityClass, insertURI, defaultName, getInserterContext(),
false);
+ return pw.waitForCompletion();
}
public void addGlobalHook(ClientEventListener listener) {
@@ -143,16 +139,16 @@
public FetcherContext getFetcherContext() {
return
- new FetcherContext(client, curMaxLength,
curMaxTempLength, curMaxMetadataLength,
+ new FetcherContext(curMaxLength, curMaxTempLength,
curMaxMetadataLength,
MAX_RECURSION, MAX_ARCHIVE_RESTARTS,
DONT_ENTER_IMPLICIT_ARCHIVES,
SPLITFILE_THREADS, SPLITFILE_BLOCK_RETRIES,
NON_SPLITFILE_RETRIES,
FETCH_SPLITFILES, FOLLOW_REDIRECTS,
LOCAL_REQUESTS_ONLY,
MAX_SPLITFILE_BLOCKS_PER_SEGMENT,
MAX_SPLITFILE_CHECK_BLOCKS_PER_SEGMENT,
- random, archiveManager, bucketFactory,
globalEventProducer, requestStarter, cacheLocalRequests);
+ random, archiveManager, bucketFactory,
globalEventProducer, cacheLocalRequests);
}
public InserterContext getInserterContext() {
- return new InserterContext(client, bucketFactory, random,
INSERT_RETRIES, CONSECUTIVE_RNFS_ASSUME_SUCCESS,
- SPLITFILE_INSERT_THREADS,
SPLITFILE_BLOCKS_PER_SEGMENT, SPLITFILE_CHECK_BLOCKS_PER_SEGMENT,
globalEventProducer, insertStarter, cacheLocalRequests);
+ return new InserterContext(bucketFactory, random,
INSERT_RETRIES, CONSECUTIVE_RNFS_ASSUME_SUCCESS,
+ SPLITFILE_INSERT_THREADS,
SPLITFILE_BLOCKS_PER_SEGMENT, SPLITFILE_CHECK_BLOCKS_PER_SEGMENT,
globalEventProducer, cacheLocalRequests);
}
}
Modified: trunk/freenet/src/freenet/client/InsertBlock.java
===================================================================
--- trunk/freenet/src/freenet/client/InsertBlock.java 2006-01-25 23:39:26 UTC
(rev 7929)
+++ trunk/freenet/src/freenet/client/InsertBlock.java 2006-01-25 23:47:20 UTC
(rev 7930)
@@ -9,8 +9,8 @@
public class InsertBlock {
Bucket data;
- final FreenetURI desiredURI;
- final ClientMetadata clientMetadata;
+ public final FreenetURI desiredURI;
+ public final ClientMetadata clientMetadata;
public InsertBlock(Bucket data, ClientMetadata metadata, FreenetURI
desiredURI) {
this.data = data;
@@ -20,7 +20,9 @@
clientMetadata = metadata;
this.desiredURI = desiredURI;
}
+
+ public Bucket getData() {
+ return data;
+ }
-
-
}
Deleted: trunk/freenet/src/freenet/client/InsertSegment.java
===================================================================
--- trunk/freenet/src/freenet/client/InsertSegment.java 2006-01-25 23:39:26 UTC
(rev 7929)
+++ trunk/freenet/src/freenet/client/InsertSegment.java 2006-01-25 23:47:20 UTC
(rev 7930)
@@ -1,69 +0,0 @@
-package freenet.client;
-
-import java.io.IOException;
-
-import freenet.keys.FreenetURI;
-import freenet.support.BucketFactory;
-import freenet.support.Logger;
-
-/**
- * Segment of a splitfile, for insertion purposes.
- */
-public class InsertSegment {
-
- final FECCodec codec;
- final SplitfileBlock[] origDataBlocks;
- final int blockLength;
- final BucketFactory bf;
- /** Check blocks. Will be created by encode(...). */
- final SplitfileBlock[] checkBlocks;
- final boolean getCHKOnly;
- // just for debugging
- final int segNo;
-
- public InsertSegment(short splitfileAlgo, SplitfileBlock[]
origDataBlocks, int blockLength, BucketFactory bf, boolean getCHKOnly, int
segNo) {
- this.origDataBlocks = origDataBlocks;
- codec = FECCodec.getCodec(splitfileAlgo, origDataBlocks.length);
- if(codec != null)
- checkBlocks = new
SplitfileBlock[codec.countCheckBlocks()];
- else
- checkBlocks = new SplitfileBlock[0];
- this.blockLength = blockLength;
- this.bf = bf;
- this.getCHKOnly = getCHKOnly;
- this.segNo = segNo;
- // FIXME: remove debugging code
- for(int i=0;i<origDataBlocks.length;i++)
- if(origDataBlocks[i].getData() == null) throw new
NullPointerException("Block "+i+" of "+origDataBlocks.length+" data blocks of
seg "+segNo+" is null");
- }
-
- /**
- * Get the check block URIs.
- * Don't call before encode()! Don't call before all blocks have
inserted either.
- */
- public FreenetURI[] getCheckURIs() {
- FreenetURI[] uris = new FreenetURI[checkBlocks.length];
- for(int i=0;i<uris.length;i++) {
- FreenetURI uri = checkBlocks[i].getURI();
- uris[i] = uri;
- }
- return uris;
- }
-
- /**
- * Encode the data blocks into check blocks.
- * @return The number of check blocks generated.
- * @throws IOException If the encode fails due to a bucket error.
- */
- public int encode(int offset, RetryTracker tracker, InserterContext
ctx) throws IOException {
- Logger.minor(this, "Encoding "+segNo+":
"+origDataBlocks.length+" into "+checkBlocks.length);
- if(codec == null) return 0; // no FEC
- for(int i=0;i<checkBlocks.length;i++)
- checkBlocks[i] = new BlockInserter(null, offset + i,
tracker, ctx, getCHKOnly);
- codec.encode(origDataBlocks, checkBlocks, blockLength, bf);
- for(int i=0;i<checkBlocks.length;i++)
- tracker.addBlock(checkBlocks[i]);
- return checkBlocks.length;
- }
-
-}
Modified: trunk/freenet/src/freenet/client/InserterContext.java
===================================================================
--- trunk/freenet/src/freenet/client/InserterContext.java 2006-01-25
23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/InserterContext.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -2,34 +2,29 @@
import freenet.client.events.ClientEventProducer;
import freenet.crypt.RandomSource;
-import freenet.node.RequestStarterClient;
-import freenet.node.SimpleLowLevelClient;
import freenet.support.BucketFactory;
/** Context object for an insert operation, including both simple and
multi-file inserts */
public class InserterContext {
- final SimpleLowLevelClient client;
- final BucketFactory bf;
+ public final BucketFactory bf;
/** If true, don't try to compress the data */
- final boolean dontCompress;
- final RandomSource random;
- final short splitfileAlgorithm;
+ public final boolean dontCompress;
+ public final RandomSource random;
+ public final short splitfileAlgorithm;
public int maxInsertRetries;
final int maxSplitInsertThreads;
final int consecutiveRNFsCountAsSuccess;
- final int splitfileSegmentDataBlocks;
- final int splitfileSegmentCheckBlocks;
+ public final int splitfileSegmentDataBlocks;
+ public final int splitfileSegmentCheckBlocks;
final ClientEventProducer eventProducer;
- final RequestStarterClient starterClient;
/** Interesting tradeoff, see comments at top of Node.java. */
final boolean cacheLocalRequests;
private boolean cancelled;
- public InserterContext(SimpleLowLevelClient client, BucketFactory bf,
RandomSource random,
+ public InserterContext(BucketFactory bf, RandomSource random,
int maxRetries, int rnfsToSuccess, int maxThreads, int
splitfileSegmentDataBlocks, int splitfileSegmentCheckBlocks,
- ClientEventProducer eventProducer, RequestStarterClient
sctx, boolean cacheLocalRequests) {
- this.client = client;
+ ClientEventProducer eventProducer, boolean
cacheLocalRequests) {
this.bf = bf;
this.random = random;
dontCompress = false;
@@ -40,12 +35,10 @@
this.eventProducer = eventProducer;
this.splitfileSegmentDataBlocks = splitfileSegmentDataBlocks;
this.splitfileSegmentCheckBlocks = splitfileSegmentCheckBlocks;
- this.starterClient = sctx;
this.cacheLocalRequests = cacheLocalRequests;
}
public InserterContext(InserterContext ctx) {
- this.client = ctx.client;
this.bf = ctx.bf;
this.random = ctx.random;
this.dontCompress = ctx.dontCompress;
@@ -56,7 +49,6 @@
this.eventProducer = ctx.eventProducer;
this.splitfileSegmentDataBlocks =
ctx.splitfileSegmentDataBlocks;
this.splitfileSegmentCheckBlocks =
ctx.splitfileSegmentCheckBlocks;
- this.starterClient = ctx.starterClient;
this.cacheLocalRequests = ctx.cacheLocalRequests;
}
Modified: trunk/freenet/src/freenet/client/InserterException.java
===================================================================
--- trunk/freenet/src/freenet/client/InserterException.java 2006-01-25
23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/InserterException.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -82,6 +82,8 @@
public static final int ROUTE_REALLY_NOT_FOUND = 8;
/** Collided with pre-existing content */
public static final int COLLISION = 9;
+ /** Cancelled by user */
+ public static final int CANCELLED = 10;
public static String getMessage(int mode) {
switch(mode) {
@@ -103,8 +105,51 @@
return "Insert could not leave the node at all";
case COLLISION:
return "Insert collided with different, pre-existing
data at the same key";
+ case CANCELLED:
+ return "Cancelled by user";
default:
return "Unknown error "+mode;
}
}
+
+ /** Is this error fatal? Non-fatal errors are errors which are likely
to go away with
+ * more retries, or at least for which there is some point retrying.
+ */
+ public boolean isFatal() {
+ return isFatal(mode);
+ }
+
+ public static boolean isFatal(int mode) {
+ switch(mode) {
+ case INVALID_URI:
+ case FATAL_ERRORS_IN_BLOCKS:
+ case COLLISION:
+ case CANCELLED:
+ return true;
+ case BUCKET_ERROR: // maybe
+ case INTERNAL_ERROR: // maybe
+ case REJECTED_OVERLOAD:
+ case TOO_MANY_RETRIES_IN_BLOCKS:
+ case ROUTE_NOT_FOUND:
+ case ROUTE_REALLY_NOT_FOUND:
+ return false;
+ default:
+ Logger.error(InserterException.class, "Error unknown to
isFatal(): "+getMessage(mode));
+ return false;
+ }
+ }
+
+ public static InserterException construct(FailureCodeTracker errors) {
+ if(errors == null) return null;
+ if(errors.isEmpty()) return null;
+ if(errors.isOneCodeOnly()) {
+ return new InserterException(errors.getFirstCode());
+ }
+ int mode;
+ if(errors.isFatal(true))
+ mode = FATAL_ERRORS_IN_BLOCKS;
+ else
+ mode = TOO_MANY_RETRIES_IN_BLOCKS;
+ return new InserterException(mode, errors, null);
+ }
}
Modified: trunk/freenet/src/freenet/client/Metadata.java
===================================================================
--- trunk/freenet/src/freenet/client/Metadata.java 2006-01-25 23:39:26 UTC
(rev 7929)
+++ trunk/freenet/src/freenet/client/Metadata.java 2006-01-25 23:47:20 UTC
(rev 7930)
@@ -19,13 +19,104 @@
/** Metadata parser/writer class. */
-public class Metadata {
+public class Metadata implements Cloneable {
static final long FREENET_METADATA_MAGIC = 0xf053b2842d91482bL;
static final int MAX_SPLITFILE_PARAMS_LENGTH = 32768;
/** Soft limit, to avoid memory DoS */
static final int MAX_SPLITFILE_BLOCKS = 100*1000;
+ // Actual parsed data
+
+ // document type
+ byte documentType;
+ public static final byte SIMPLE_REDIRECT = 0;
+ static final byte MULTI_LEVEL_METADATA = 1;
+ static final byte SIMPLE_MANIFEST = 2;
+ static final byte ZIP_MANIFEST = 3;
+ static final byte ZIP_INTERNAL_REDIRECT = 4;
+
+ // 2 bytes of flags
+ /** Is a splitfile */
+ boolean splitfile;
+ /** Is a DBR */
+ boolean dbr;
+ /** No MIME type; on by default as not all doctypes have MIME */
+ boolean noMIME = true;
+ /** Compressed MIME type */
+ boolean compressedMIME;
+ /** Has extra client-metadata */
+ boolean extraMetadata;
+ /** Keys stored in full (otherwise assumed to be CHKs) */
+ boolean fullKeys;
+ /** Non-final splitfile chunks can be non-full */
+ boolean splitUseLengths;
+ static final short FLAGS_SPLITFILE = 1;
+ static final short FLAGS_DBR = 2;
+ static final short FLAGS_NO_MIME = 4;
+ static final short FLAGS_COMPRESSED_MIME = 8;
+ static final short FLAGS_EXTRA_METADATA = 16;
+ static final short FLAGS_FULL_KEYS = 32;
+ static final short FLAGS_SPLIT_USE_LENGTHS = 64;
+ static final short FLAGS_COMPRESSED = 128;
+
+ /** Container archive type */
+ short archiveType;
+ static final short ARCHIVE_ZIP = 0;
+ static final short ARCHIVE_TAR = 1; // FIXME for future use
+
+ /** Compressed splitfile codec */
+ short compressionCodec = -1;
+ static public final short COMPRESS_GZIP = 0;
+ static final short COMPRESS_BZIP2 = 1; // FIXME for future use
+
+ /** The length of the splitfile */
+ long dataLength;
+ /** The decompressed length of the compressed data */
+ long decompressedLength;
+
+ /** The MIME type, as a string */
+ String mimeType;
+
+ /** The compressed MIME type - lookup index for the MIME types table.
+ * Must be between 0 and 32767.
+ */
+ short compressedMIMEValue;
+ boolean hasCompressedMIMEParams;
+ short compressedMIMEParams;
+
+ /** The simple redirect key */
+ FreenetURI simpleRedirectKey;
+
+ short splitfileAlgorithm;
+ static public final short SPLITFILE_NONREDUNDANT = 0;
+ static public final short SPLITFILE_ONION_STANDARD = 1;
+
+ /** Splitfile parameters */
+ byte[] splitfileParams;
+ int splitfileBlocks;
+ int splitfileCheckBlocks;
+ FreenetURI[] splitfileDataKeys;
+ FreenetURI[] splitfileCheckKeys;
+
+ // Manifests
+ int manifestEntryCount;
+ /** Manifest entries by name */
+ HashMap manifestEntries;
+
+ /** ZIP internal redirect: name of file in ZIP */
+ String nameInArchive;
+
+ ClientMetadata clientMetadata;
+
+ public Object clone() {
+ try {
+ return super.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new Error("Yes it is!");
+ }
+ }
+
/** Parse a block of bytes into a Metadata structure.
* Constructor method because of need to catch impossible exceptions.
* @throws MetadataParseException If the metadata is invalid.
@@ -463,7 +554,7 @@
/**
* Write the data to a byte array.
*/
- byte[] writeToByteArray() {
+ public byte[] writeToByteArray() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
try {
@@ -507,89 +598,6 @@
} else throw new IllegalArgumentException("Full keys
must be enabled to write non-CHKs");
}
}
- // Actual parsed data
-
- // document type
- byte documentType;
- static final byte SIMPLE_REDIRECT = 0;
- static final byte MULTI_LEVEL_METADATA = 1;
- static final byte SIMPLE_MANIFEST = 2;
- static final byte ZIP_MANIFEST = 3;
- static final byte ZIP_INTERNAL_REDIRECT = 4;
-
- // 2 bytes of flags
- /** Is a splitfile */
- boolean splitfile;
- /** Is a DBR */
- boolean dbr;
- /** No MIME type; on by default as not all doctypes have MIME */
- boolean noMIME = true;
- /** Compressed MIME type */
- boolean compressedMIME;
- /** Has extra client-metadata */
- boolean extraMetadata;
- /** Keys stored in full (otherwise assumed to be CHKs) */
- boolean fullKeys;
- /** Non-final splitfile chunks can be non-full */
- boolean splitUseLengths;
- static final short FLAGS_SPLITFILE = 1;
- static final short FLAGS_DBR = 2;
- static final short FLAGS_NO_MIME = 4;
- static final short FLAGS_COMPRESSED_MIME = 8;
- static final short FLAGS_EXTRA_METADATA = 16;
- static final short FLAGS_FULL_KEYS = 32;
- static final short FLAGS_SPLIT_USE_LENGTHS = 64;
- static final short FLAGS_COMPRESSED = 128;
-
- /** Container archive type */
- short archiveType;
- static final short ARCHIVE_ZIP = 0;
- static final short ARCHIVE_TAR = 1; // FIXME for future use
-
- /** Compressed splitfile codec */
- short compressionCodec = -1;
- static public final short COMPRESS_GZIP = 0;
- static final short COMPRESS_BZIP2 = 1; // FIXME for future use
-
- /** The length of the splitfile */
- long dataLength;
- /** The decompressed length of the compressed data */
- long decompressedLength;
-
- /** The MIME type, as a string */
- String mimeType;
-
- /** The compressed MIME type - lookup index for the MIME types table.
- * Must be between 0 and 32767.
- */
- short compressedMIMEValue;
- boolean hasCompressedMIMEParams;
- short compressedMIMEParams;
-
- /** The simple redirect key */
- FreenetURI simpleRedirectKey;
-
- short splitfileAlgorithm;
- static final short SPLITFILE_NONREDUNDANT = 0;
- static final short SPLITFILE_ONION_STANDARD = 1;
-
- /** Splitfile parameters */
- byte[] splitfileParams;
- int splitfileBlocks;
- int splitfileCheckBlocks;
- FreenetURI[] splitfileDataKeys;
- FreenetURI[] splitfileCheckKeys;
-
- // Manifests
- int manifestEntryCount;
- /** Manifest entries by name */
- HashMap manifestEntries;
-
- /** ZIP internal redirect: name of file in ZIP */
- String nameInArchive;
-
- ClientMetadata clientMetadata;
-
/** Is a manifest? */
public boolean isSimpleManifest() {
return documentType == SIMPLE_MANIFEST;
@@ -799,4 +807,20 @@
public boolean isCompressed() {
return compressionCodec >= 0;
}
+
+ public boolean splitUseLengths() {
+ return splitUseLengths;
+ }
+
+ public short getCompressionCodec() {
+ return compressionCodec;
+ }
+
+ public long dataLength() {
+ return dataLength;
+ }
+
+ public byte[] splitfileParams() {
+ return splitfileParams;
+ }
}
Deleted: trunk/freenet/src/freenet/client/MultiFileInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/MultiFileInserter.java 2006-01-25
23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/MultiFileInserter.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -1,191 +0,0 @@
-package freenet.client;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-import freenet.keys.FreenetURI;
-import freenet.support.ArrayBucket;
-import freenet.support.Bucket;
-import freenet.support.BucketTools;
-import freenet.support.Logger;
-
-public class MultiFileInserter {
-
- public class MFInserter implements Runnable {
-
- final int num;
-
- MFInserter(int x) {
- num = x;
- }
-
- public void run() {
- try {
- while(true) {
- String name = null;
- Bucket data = null;
- synchronized(bucketsByName) {
- if(bucketsByName.isEmpty()) break;
- name = (String)
bucketsByName.keySet().iterator().next();
- data = (Bucket)
bucketsByName.remove(name);
- }
- String mimeType =
DefaultMIMETypes.guessMIMEType(name);
- Logger.minor(this, "Name: "+name+"\nBucket
size: "+data.size()+"\nGuessed MIME type: "+mimeType);
- byte[] metaByteArray;
- try {
- metaByteArray = getMetadata(name, data,
mimeType);
- } catch (InserterException e) {
- Logger.normal(this, "Error inserting
"+name+": "+e.getMessage());
- errorCodes.inc(e.getMode());
- synchronized(this) {
- errors++;
- }
- continue;
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t);
-
errorCodes.inc(InserterException.INTERNAL_ERROR);
- synchronized(this) {
- errors++;
- }
- continue;
- }
- if(metaByteArray != null) {
- synchronized(namesToMetadataByteArrays)
{
-
namesToMetadataByteArrays.put(name, metaByteArray);
- }
- Logger.minor(this, "Inserted "+name);
- } else {
- Logger.normal(this, "Insert failed:
"+name);
- }
-
- }
- } finally {
- synchronized(MultiFileInserter.this) {
- finished[num] = true;
- MultiFileInserter.this.notifyAll();
- }
- }
- }
-
- }
-
- final FreenetURI targetURI;
- final HashMap bucketsByName;
- final InserterContext ctx;
- final String defaultName;
- final HashMap namesToMetadataByteArrays;
- final FailureCodeTracker errorCodes;
- private int errors;
- private final boolean[] finished;
-
- public MultiFileInserter(FreenetURI insertURI, HashMap bucketsByName,
InserterContext context, String defaultName) {
- this.targetURI = insertURI;
- this.bucketsByName = bucketsByName;
- this.ctx = context;
- this.defaultName = defaultName;
- this.namesToMetadataByteArrays = new HashMap();
- this.errorCodes = new FailureCodeTracker(true);
- if(bucketsByName.get(defaultName) == null)
- // FIXME make this an InserterException.
- throw new IllegalArgumentException();
- finished = new boolean[5];
- }
-
- public FreenetURI run() throws InserterException {
- // For each file, guess MIME type, insert it, get the metadata.
- // Then put all the metadata at once into one manifest.
- // Then return it.
-
- // FIXME scaling issues; have to keep everything in RAM...
-
- for(int j=0;j<finished.length;j++) {
- MFInserter it = new MFInserter(j);
- Thread t = new Thread(it, "Inserter #"+j);
- t.setDaemon(true);
- t.start();
- }
-
- synchronized(this) {
- while(true) {
- boolean stillRunning = false;
- for(int i=0;i<finished.length;i++) {
- if(!finished[i]) stillRunning = true;
- }
- if(!stillRunning) break;
- try {
- wait(10000);
- } catch (InterruptedException e) {
- // Impossible??
- }
- }
- }
-
- if(defaultName != null) {
- synchronized(namesToMetadataByteArrays) {
- byte[] defaultData = (byte[])
namesToMetadataByteArrays.get(defaultName);
- if(defaultData != null)
- namesToMetadataByteArrays.put("",
defaultData);
- else {
- Logger.error(this, "Default name
"+defaultName+" does not exist");
-
if(namesToMetadataByteArrays.containsKey(defaultName))
- Logger.error(this, "Default
name exists but has null bytes!");
- // It existed ... and now it doesn't?!
- throw new
InserterException(InserterException.INTERNAL_ERROR);
- }
- }
- }
-
- Metadata manifestMetadata =
Metadata.mkRedirectionManifestWithMetadata(namesToMetadataByteArrays);
-
- Bucket metadata = new
ArrayBucket(manifestMetadata.writeToByteArray());
-
- FileInserter fi = new FileInserter(ctx);
-
- InsertBlock block = new InsertBlock(metadata, null, targetURI);
-
- FreenetURI uri = fi.run(block, true, false, false, null);
-
- if(errors > 0) {
- throw new
InserterException(InserterException.FATAL_ERRORS_IN_BLOCKS, errorCodes, uri);
- }
-
- return uri;
- }
-
- private byte[] getMetadata(String name, Bucket data, String mimeType)
throws InserterException {
- FileInserter fi = new FileInserter(ctx);
- InsertBlock block = new InsertBlock(data, new
ClientMetadata(mimeType), FreenetURI.EMPTY_CHK_URI);
- ArrayBucket metaBucket = new ArrayBucket();
- FreenetURI uri;
- // FIXME make a client event and switch this to logger.log(...)
- System.out.println("Inserting "+name+" ("+data.size()+" bytes,
"+mimeType+")");
- try {
- uri = fi.run(block, false, false, false, metaBucket);
- } catch (InserterException e1) {
- if(e1.uri != null && e1.getMode() ==
InserterException.COLLISION || e1.getMode() ==
InserterException.ROUTE_NOT_FOUND || e1.getMode() ==
InserterException.ROUTE_REALLY_NOT_FOUND) {
- Logger.minor(this, "Ignoring "+e1);
- uri = e1.uri;
- } else {
- // Clear the uri.
- throw new InserterException(e1.getMode());
- }
- }
- byte[] metaByteArray;
- if(metaBucket.size() == 0) {
- // It didn't give us any metadata
- Logger.minor(this, "Did not return metadata: creating
our own");
- Metadata m = new Metadata(Metadata.SIMPLE_REDIRECT,
uri, null);
- metaByteArray = m.writeToByteArray();
- if(metaByteArray == null) throw new
NullPointerException();
- } else {
- try {
- metaByteArray =
BucketTools.toByteArray(metaBucket);
- if(metaByteArray == null) throw new
NullPointerException();
- } catch (IOException e) {
- throw new Error(e);
- }
- }
- return metaByteArray;
- }
-
-}
Copied: trunk/freenet/src/freenet/client/PutWaiter.java (from rev 7929,
branches/async-client/src/freenet/client/PutWaiter.java)
Deleted: trunk/freenet/src/freenet/client/RetryTracker.java
===================================================================
--- trunk/freenet/src/freenet/client/RetryTracker.java 2006-01-25 23:39:26 UTC
(rev 7929)
+++ trunk/freenet/src/freenet/client/RetryTracker.java 2006-01-25 23:47:20 UTC
(rev 7930)
@@ -1,392 +0,0 @@
-package freenet.client;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Vector;
-
-import freenet.crypt.RandomSource;
-import freenet.support.Logger;
-
-/**
- * Keeps a list of SplitfileBlocks for each retry level.
- */
-public class RetryTracker {
-
- class Level {
- final int level;
- final Vector blocks;
-
- Level(int l) {
- level = l;
- blocks = new Vector();
- }
-
- /**
- * Return a random block.
- * Call synchronized on RetryTracker.
- */
- SplitfileBlock getBlock() {
- int len = blocks.size();
- int x = random.nextInt(len);
- SplitfileBlock block = (SplitfileBlock)
blocks.remove(x);
- if(blocks.isEmpty())
- removeLevel(level);
- return block;
- }
-
- void add(SplitfileBlock block) {
- blocks.add(block);
- }
-
- /**
- * Remove a specific block.
- * Remove self if run out of blocks.
- * Call synchronized on RetryTracker.
- */
- void remove(SplitfileBlock block) {
- blocks.remove(block);
- if(blocks.isEmpty())
- removeLevel(level);
- }
- }
-
- final FailureCodeTracker fatalErrors;
- final FailureCodeTracker nonfatalErrors;
- final HashMap levels;
- final RandomSource random;
- final int maxLevel;
- final HashSet failedBlocksTooManyRetries;
- final HashSet failedBlocksFatalErrors;
- final HashSet runningBlocks;
- final HashSet succeededBlocks;
- private int curMaxLevel;
- private int curMinLevel;
- /** Maximum number of concurrently running threads */
- final int maxThreads;
- /** After we have successes on this many blocks, we should terminate,
- * even if there are threads running and blocks queued. */
- final int targetSuccesses;
- final boolean killOnFatalError;
- private boolean killed;
- private boolean finishOnEmpty;
- private final RetryTrackerCallback callback;
- private boolean callOnProgress = false;
-
- /**
- * Create a RetryTracker.
- * @param maxLevel The maximum number of tries for each block.
- * @param random The random number source.
- * @param maxThreads The maximum number of threads to use.
- * @param killOnFatalError Whether to terminate the tracker when a fatal
- * error occurs on a single block.
- * @param cb The callback to call .finish(...) when we no longer have
- * anything to do *and* the client has set the finish on empty flag.
- */
- public RetryTracker(int maxLevel, int targetSuccesses, RandomSource
random, int maxThreads, boolean killOnFatalError, RetryTrackerCallback cb,
boolean isInsert) {
- levels = new HashMap();
- fatalErrors = new FailureCodeTracker(isInsert);
- nonfatalErrors = new FailureCodeTracker(isInsert);
- this.targetSuccesses = targetSuccesses;
- this.maxLevel = maxLevel;
- this.random = random;
- curMaxLevel = curMinLevel = 0;
- failedBlocksTooManyRetries = new HashSet();
- failedBlocksFatalErrors = new HashSet();
- runningBlocks = new HashSet();
- succeededBlocks = new HashSet();
- this.maxThreads = maxThreads;
- this.killOnFatalError = killOnFatalError;
- this.finishOnEmpty = false;
- this.callback = cb;
- }
-
- /**
- * Set the finish-on-empty flag to true.
- * This means that when there are no longer any blocks to process, and
there
- * are none running, the tracker will call the client's finish(...)
method.
- */
- public synchronized void setFinishOnEmpty() {
- finishOnEmpty = true;
- }
-
- /** Remove a level */
- private synchronized void removeLevel(int level) {
- Logger.minor(this, "Removing level "+level);
- Integer x = new Integer(level);
- levels.remove(x);
- if(curMinLevel == level) {
- for(int i=curMinLevel;i<=curMaxLevel;i++) {
- x = new Integer(i);
- if(levels.get(x) != null) {
- curMinLevel = i;
- return;
- }
- }
- curMinLevel = curMaxLevel = 0;
- return;
- }
- if(curMaxLevel == level) {
- for(int i=curMaxLevel;i>=curMinLevel;i--) {
- x = new Integer(i);
- if(levels.get(x) != null) {
- curMaxLevel = i;
- return;
- }
- }
- curMinLevel = curMaxLevel = 0;
- return;
- }
- }
-
- /** Add a level */
- private synchronized Level addLevel(int level, Integer x) {
- Logger.minor(this, "Adding level "+level);
- if(level < 0) throw new IllegalArgumentException();
- Level l = new Level(level);
- if(levels.isEmpty()) {
- curMaxLevel = curMinLevel = level;
- } else {
- if(level > curMaxLevel) curMaxLevel = level;
- if(level < curMinLevel) curMinLevel = level;
- }
- levels.put(x, l);
- return l;
- }
-
- /** Get an existing level, or add one if necessary */
- private synchronized Level makeLevel(int level) {
- Integer x = new Integer(level);
- Level l = (Level) levels.get(x);
- if(l == null) {
- return addLevel(level, x);
- }
- else return l;
- }
-
- /**
- * Add a block at retry level zero.
- */
- public synchronized void addBlock(SplitfileBlock block) {
- if(killed) return;
- Level l = makeLevel(0);
- l.add(block);
- maybeStart(true);
- }
-
- /**
- * A block got a nonfatal error and should be retried.
- * Move it out of the running list and back into the relevant list,
unless
- * we have run out of retries.
- */
- public void nonfatalError(SplitfileBlock block, int reasonCode) {
- synchronized(this) {
- nonfatalErrors.inc(reasonCode);
- runningBlocks.remove(block);
- int levelNumber = block.getRetryCount();
- levelNumber++;
- Logger.minor(this, "Non-fatal error on "+block+" ->
"+levelNumber);
- if(levelNumber > maxLevel) {
- failedBlocksTooManyRetries.add(block);
- Logger.minor(this, "Finished with "+block);
- } else {
- Level newLevel = makeLevel(levelNumber);
- newLevel.add(block);
- }
- }
- maybeStart(false);
- if(callOnProgress)
- callback.onProgress();
- }
-
- /**
- * A block got a fatal error and should not be retried.
- * Move it into the fatal error list.
- * @param reasonCode A client-specific code indicating the type of
failure.
- */
- public void fatalError(SplitfileBlock block, int reasonCode) {
- synchronized(this) {
- fatalErrors.inc(reasonCode);
- runningBlocks.remove(block);
- failedBlocksFatalErrors.add(block);
- }
- maybeStart(false);
- if(callOnProgress)
- callback.onProgress();
- }
-
- /**
- * If we can start some blocks, start some blocks.
- * Otherwise if we are finished, call the callback's finish method.
- */
- public void maybeStart(boolean cantCallFinished) {
- boolean callFinished = false;
- synchronized(this) {
- if(killed) return;
- Logger.minor(this, "succeeded: "+succeededBlocks.size()+",
target: "+targetSuccesses+
- ", failed:
"+failedBlocksTooManyRetries.size()+", fatal: "+failedBlocksFatalErrors.size()+
- ", running: "+runningBlocks.size()+", levels:
"+levels.size()+"("+curMinLevel+"-"+curMaxLevel+
- "), finishOnEmpty: "+finishOnEmpty+" for
"+callback);
- if(runningBlocks.size() == 1)
- Logger.minor(this, "Only block running:
"+runningBlocks.toArray()[0]);
- else if(levels.isEmpty()) {
- for(Iterator i=runningBlocks.iterator();i.hasNext();) {
- Logger.minor(this, "Still running: "+i.next());
- }
- }
- if((succeededBlocks.size() >= targetSuccesses)
- || (runningBlocks.isEmpty() && levels.isEmpty()
&& finishOnEmpty)) {
- killed = true;
- Logger.minor(this, "Finishing");
- SplitfileBlock[] running = runningBlocks();
- for(int i=0;i<running.length;i++) {
- running[i].kill();
- }
- runningBlocks.clear();
- if(!cantCallFinished)
- callFinished = true;
- else {
- Runnable r = new Runnable() { public void run()
{ callback.finished(succeededBlocks(), failedBlocks(), fatalErrorBlocks()); } };
- Thread t = new Thread(r);
- t.setDaemon(true);
- t.start();
- }
- } else {
- while(runningBlocks.size() < maxThreads) {
- SplitfileBlock block = getBlock();
- if(block == null) break;
- Logger.minor(this, "Starting: "+block);
- block.start();
- runningBlocks.add(block);
- }
- }
- }
- if(callFinished)
- callback.finished(succeededBlocks(), failedBlocks(),
fatalErrorBlocks());
- }
-
- public void success(SplitfileBlock block) {
- synchronized(this) {
- if(killed) return;
- runningBlocks.remove(block);
- succeededBlocks.add(block);
- }
- maybeStart(false);
- if(callOnProgress)
- callback.onProgress();
- }
-
- public synchronized void callOnProgress() {
- callOnProgress = true;
- }
-
- /**
- * Get the next block to try. This is a randomly selected block from the
- * lowest priority currently available. Move it into the running list.
- */
- public synchronized SplitfileBlock getBlock() {
- if(killed) return null;
- Integer iMin = new Integer(curMinLevel);
- Level l = (Level) levels.get(iMin);
- if(l == null) {
- if(!(curMinLevel == 0 && curMaxLevel == 0))
- Logger.error(this, "min="+curMinLevel+",
max="+curMaxLevel+" but min does not exist!");
- if(!levels.isEmpty()) {
- Integer[] levelNums = (Integer[])
levels.keySet().toArray(new Integer[levels.size()]);
- java.util.Arrays.sort(levelNums);
- Integer x = levelNums[0];
- curMinLevel = x.intValue();
- Integer y = levelNums[levelNums.length-1];
- curMaxLevel = y.intValue();
- Logger.normal(this, "Corrected:
min="+curMinLevel+", max="+curMaxLevel);
- return getBlock();
- }
- else return null;
- }
- return l.getBlock();
- }
-
- /**
- * Get all running blocks.
- */
- public synchronized SplitfileBlock[] runningBlocks() {
- return (SplitfileBlock[])
- runningBlocks.toArray(new
SplitfileBlock[runningBlocks.size()]);
- }
-
- /**
- * Get all blocks with fatal errors.
- * SplitfileBlock's are assumed to remember their errors, so we don't.
- */
- public synchronized SplitfileBlock[] fatalErrorBlocks() {
- return (SplitfileBlock[])
- failedBlocksFatalErrors.toArray(new
SplitfileBlock[failedBlocksFatalErrors.size()]);
- }
-
- /**
- * Get all blocks which didn't succeed in the maximum number of tries.
- */
- public synchronized SplitfileBlock[] failedBlocks() {
- return (SplitfileBlock[])
- failedBlocksTooManyRetries.toArray(new
SplitfileBlock[failedBlocksTooManyRetries.size()]);
- }
-
- /**
- * Get all successfully downloaded blocks.
- */
- public synchronized SplitfileBlock[] succeededBlocks() {
- return (SplitfileBlock[])
- succeededBlocks.toArray(new
SplitfileBlock[succeededBlocks.size()]);
- }
-
- public synchronized int succeededBlocksLength() {
- return succeededBlocks.size();
- }
-
- /**
- * Count the number of blocks which could not be fetched because we ran
out
- * of retries.
- */
- public synchronized int countFailedBlocks() {
- return failedBlocksTooManyRetries.size();
- }
-
- /**
- * Highest number of completed retries of any block so far.
- */
- public synchronized int highestRetries() {
- return curMaxLevel;
- }
-
- /**
- * Lowest number of completed retries of any block so far.
- */
- public synchronized int lowestRetries() {
- return curMinLevel;
- }
-
- /**
- * Are there more blocks to process?
- */
- public synchronized boolean moreBlocks() {
- return !levels.isEmpty();
- }
-
- public FailureCodeTracker getAccumulatedFatalErrorCodes() {
- return fatalErrors;
- }
-
- public FailureCodeTracker getAccumulatedNonFatalErrorCodes() {
- return nonfatalErrors;
- }
-
- public synchronized void kill() {
- killed = true;
- levels.clear();
- for(Iterator i=runningBlocks.iterator();i.hasNext();) {
- SplitfileBlock sb = (SplitfileBlock) i.next();
- sb.kill();
- }
- runningBlocks.clear();
- }
-}
Deleted: trunk/freenet/src/freenet/client/RetryTrackerCallback.java
===================================================================
--- trunk/freenet/src/freenet/client/RetryTrackerCallback.java 2006-01-25
23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/RetryTrackerCallback.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -1,21 +0,0 @@
-package freenet.client;
-
-/**
- * Object passed to RetryTracker. This is called when RetryTracker finishes.
- */
-public interface RetryTrackerCallback {
-
- /**
- * Notify the caller that we have finished.
- * @param succeeded The blocks which succeeded.
- * @param failed The blocks which failed.
- * @param fatalErrors The blocks which got fatal errors.
- */
- void finished(SplitfileBlock[] succeeded, SplitfileBlock[] failed,
SplitfileBlock[] fatalErrors);
-
- /**
- * When a block completes etc.
- */
- void onProgress();
-
-}
Deleted: trunk/freenet/src/freenet/client/Segment.java
===================================================================
--- trunk/freenet/src/freenet/client/Segment.java 2006-01-25 23:39:26 UTC
(rev 7929)
+++ trunk/freenet/src/freenet/client/Segment.java 2006-01-25 23:47:20 UTC
(rev 7930)
@@ -1,288 +0,0 @@
-package freenet.client;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.LinkedList;
-import java.util.Vector;
-
-import freenet.keys.FreenetURI;
-import freenet.support.Bucket;
-import freenet.support.BucketTools;
-import freenet.support.Logger;
-
-/**
- * A segment, within a splitfile.
- * Self-starting Runnable.
- *
- * Does not require locking, because all locking goes through the parent
Segment.
- */
-public class Segment implements RetryTrackerCallback {
-
- final short splitfileType;
- final FreenetURI[] dataBlocks;
- final FreenetURI[] checkBlocks;
- final BlockFetcher[] dataBlockStatus;
- final BlockFetcher[] checkBlockStatus;
- final int minFetched;
- private Vector blocksNotTried;
- final SplitFetcher parentFetcher;
- final ArchiveContext archiveContext;
- final FetcherContext fetcherContext;
- final long maxBlockLength;
- final boolean nonFullBlocksAllowed;
- /** Has the segment started to do something? Irreversible. */
- private boolean started;
- /** Has the segment finished processing? Irreversible. */
- private boolean finished;
- /** Bucket to store the data retrieved, after it has been decoded */
- private Bucket decodedData;
- /** Recently completed fetches */
- final LinkedList recentlyCompletedFetches;
- /** Running fetches */
- LinkedList runningFetches;
- /** Fetch context for block fetches */
- final FetcherContext blockFetchContext;
- /** Recursion level */
- final int recursionLevel;
- /** Retry tracker */
- private final RetryTracker tracker;
- private FetchException failureException;
-
- /**
- * Create a Segment.
- * @param splitfileType The type of the splitfile.
- * @param splitfileDataBlocks The data blocks to fetch.
- * @param splitfileCheckBlocks The check blocks to fetch.
- */
- public Segment(short splitfileType, FreenetURI[] splitfileDataBlocks,
FreenetURI[] splitfileCheckBlocks,
- SplitFetcher fetcher, ArchiveContext actx,
FetcherContext fctx, long maxTempLength, boolean useLengths, int recLevel)
throws MetadataParseException {
- this.splitfileType = splitfileType;
- dataBlocks = splitfileDataBlocks;
- checkBlocks = splitfileCheckBlocks;
- if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT) {
- minFetched = dataBlocks.length;
- } else if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD) {
- minFetched = dataBlocks.length;
- } else throw new MetadataParseException("Unknown splitfile
type"+splitfileType);
- tracker = new RetryTracker(fctx.maxSplitfileBlockRetries,
splitfileDataBlocks.length, fctx.random, fctx.maxSplitfileThreads, false, this,
false);
- // Don't add blocks to tracker yet, because don't want to start
fetch yet.
- parentFetcher = fetcher;
- archiveContext = actx;
- fetcherContext = fctx;
- maxBlockLength = maxTempLength;
- nonFullBlocksAllowed = useLengths;
- started = false;
- finished = false;
- decodedData = null;
- dataBlockStatus = new BlockFetcher[dataBlocks.length];
- checkBlockStatus = new BlockFetcher[checkBlocks.length];
- blocksNotTried = new Vector();
- Vector firstSet = new
Vector(dataBlocks.length+checkBlocks.length);
- blocksNotTried.add(0, firstSet);
- for(int i=0;i<dataBlocks.length;i++) {
- dataBlockStatus[i] = new BlockFetcher(this, tracker,
dataBlocks[i], i, fctx.dontEnterImplicitArchives);
- firstSet.add(dataBlockStatus[i]);
- }
- for(int i=0;i<checkBlocks.length;i++) {
- checkBlockStatus[i] = new BlockFetcher(this, tracker,
checkBlocks[i], dataBlockStatus.length + i, fctx.dontEnterImplicitArchives);
- firstSet.add(checkBlockStatus[i]);
- }
- recentlyCompletedFetches = new LinkedList();
- runningFetches = new LinkedList();
- // FIXME be a bit more flexible here depending on flags
- if(useLengths) {
- blockFetchContext = new FetcherContext(fetcherContext,
FetcherContext.SPLITFILE_USE_LENGTHS_MASK);
- this.recursionLevel = recLevel;
- } else {
- blockFetchContext = new FetcherContext(fetcherContext,
FetcherContext.SPLITFILE_DEFAULT_BLOCK_MASK);
- this.recursionLevel = 0;
- }
- Logger.minor(this, "Created segment: data blocks:
"+dataBlocks.length+", check blocks: "+checkBlocks.length+" "+this);
- }
-
- /**
- * Is the segment finished? (Either error or fetched and decoded)?
- */
- public boolean isFinished() {
- return finished;
- }
-
- /**
- * If there was an error, throw it now.
- */
- public void throwError() throws FetchException {
- if(failureException != null)
- throw failureException;
- }
-
- /**
- * Return the length of the data, after decoding.
- */
- public long decodedLength() {
- return decodedData.size();
- }
-
- /**
- * Write the decoded data to the given output stream.
- * Do not write more than the specified number of bytes (unless it is
negative,
- * in which case ignore it).
- * @return The number of bytes written.
- * @throws IOException If there was an error reading from the bucket
the data is
- * stored in, or writing to the stream provided.
- */
- public long writeDecodedDataTo(OutputStream os, long truncateLength)
throws IOException {
- long len = decodedData.size();
- if(truncateLength >= 0 && truncateLength < len)
- len = truncateLength;
- BucketTools.copyTo(decodedData, os, Math.min(truncateLength,
decodedData.size()));
- return len;
- }
-
- /**
- * Return true if the Segment has been started, otherwise false.
- */
- public boolean isStarted() {
- return started;
- }
-
- /**
- * Start the Segment fetching the data. When it has finished fetching,
it will
- * notify the SplitFetcher. Note that we must not start fetching until
this
- * method is called, because of the requirement to not fetch all
segments
- * simultaneously.
- */
- public void start() {
- started = true;
- for(int i=0;i<dataBlockStatus.length;i++) {
- tracker.addBlock(dataBlockStatus[i]);
- }
- Logger.minor(this, "Added data blocks");
- for(int i=0;i<checkBlockStatus.length;i++) {
- tracker.addBlock(checkBlockStatus[i]);
- }
- tracker.callOnProgress();
- tracker.setFinishOnEmpty();
- }
-
- /**
- * How many fetches are running?
- */
- private int runningFetches() {
- synchronized(runningFetches) {
- return runningFetches.size();
- }
- }
-
- /**
- * Once we have enough data to decode, tell parent, and decode it.
- */
- public void finished(SplitfileBlock[] succeeded, SplitfileBlock[]
failed, SplitfileBlock[] fatalErrors) {
-
- Logger.minor(this, "Finished("+succeeded.length+",
"+failed.length+", "+fatalErrors.length+")");
- parentFetcher.gotBlocks(this);
- if(succeeded.length >= minFetched)
- // Not finished yet, need to decode
- try {
- successfulFetch();
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t+" decoding
"+this);
- finished = true;
- failureException = new
FetchException(FetchException.INTERNAL_ERROR, t);
- parentFetcher.segmentFinished(this);
- }
- else {
- failureException = new
SplitFetchException(failed.length, fatalErrors.length, succeeded.length,
minFetched,
tracker.getAccumulatedNonFatalErrorCodes().merge(tracker.getAccumulatedFatalErrorCodes()));
- finished = true;
- parentFetcher.segmentFinished(this);
- }
- }
-
- /**
- * Successful fetch, do the decode, tell the parent, etc.
- */
- private void successfulFetch() {
-
- // Now decode
- Logger.minor(this, "Decoding "+this);
-
- FECCodec codec = FECCodec.getCodec(splitfileType,
dataBlocks.length, checkBlocks.length);
- try {
- if(splitfileType != Metadata.SPLITFILE_NONREDUNDANT) {
- // FIXME hardcoded block size below.
- codec.decode(dataBlockStatus, checkBlockStatus,
32768, fetcherContext.bucketFactory);
- // Now have all the data blocks (not
necessarily all the check blocks)
- }
-
- decodedData =
fetcherContext.bucketFactory.makeBucket(-1);
- Logger.minor(this, "Copying data from data blocks");
- OutputStream os = decodedData.getOutputStream();
- for(int i=0;i<dataBlockStatus.length;i++) {
- BlockFetcher status = dataBlockStatus[i];
- Bucket data = status.fetchedData;
- BucketTools.copyTo(data, os, Long.MAX_VALUE);
- }
- Logger.minor(this, "Copied data");
- os.close();
- // Must set finished BEFORE calling parentFetcher.
- // Otherwise a race is possible that might result in it
not seeing our finishing.
- finished = true;
- parentFetcher.segmentFinished(this);
- } catch (IOException e) {
- Logger.minor(this, "Caught bucket error?: "+e, e);
- finished = true;
- failureException = new
FetchException(FetchException.BUCKET_ERROR);
- parentFetcher.segmentFinished(this);
- return;
- }
-
- // Now heal
-
- // Encode any check blocks we don't have
- if(codec != null) {
- try {
- codec.encode(dataBlockStatus, checkBlockStatus,
32768, fetcherContext.bucketFactory);
- } catch (IOException e) {
- Logger.error(this, "Bucket error while healing:
"+e, e);
- }
- }
-
- // Now insert *ALL* blocks on which we had at least one
failure, and didn't eventually succeed
- for(int i=0;i<dataBlockStatus.length;i++) {
- BlockFetcher block = dataBlockStatus[i];
- if(block.actuallyFetched) continue;
- if(block.completedTries == 0) {
- // 80% chance of not inserting, if we never
tried it
- if(fetcherContext.random.nextInt(5) == 0)
continue;
- }
- block.queueHeal();
- }
-
- // FIXME heal check blocks too
- }
-
- public void onProgress() {
- if(fetcherContext.isCancelled()) {
- finished = true;
- tracker.kill();
- failureException = new
FetchException(FetchException.CANCELLED);
- parentFetcher.gotBlocks(this);
- }
- parentFetcher.onProgress();
- }
-
- public int fetchedBlocks() {
- return tracker.succeededBlocksLength();
- }
-
- public int failedBlocks() {
- return tracker.failedBlocks().length;
- }
-
- public int fatallyFailedBlocks() {
- return tracker.fatalErrorBlocks().length;
- }
-
- public int runningBlocks() {
- return tracker.runningBlocks().length;
- }
-}
Deleted: trunk/freenet/src/freenet/client/SplitFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/SplitFetcher.java 2006-01-25 23:39:26 UTC
(rev 7929)
+++ trunk/freenet/src/freenet/client/SplitFetcher.java 2006-01-25 23:47:20 UTC
(rev 7930)
@@ -1,255 +0,0 @@
-package freenet.client;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Vector;
-
-import com.onionnetworks.fec.FECCode;
-import com.onionnetworks.fec.FECCodeFactory;
-
-import freenet.client.events.SplitfileProgressEvent;
-import freenet.keys.FreenetURI;
-import freenet.keys.NodeCHK;
-import freenet.support.Bucket;
-import freenet.support.Fields;
-import freenet.support.Logger;
-
-/**
- * Class to fetch a splitfile.
- */
-public class SplitFetcher {
-
- // 128/192. Crazy, but it's possible we'd get big erasures.
- static final int ONION_STD_K = 128;
- static final int ONION_STD_N = 192;
-
- /** The standard onion codec */
- static FECCode onionStandardCode =
-
FECCodeFactory.getDefault().createFECCode(ONION_STD_K,ONION_STD_N);
-
- /** The splitfile type. See the SPLITFILE_ constants on Metadata. */
- final short splitfileType;
- /** The segment length. -1 means not segmented and must get everything
to decode. */
- final int blocksPerSegment;
- /** The segment length in check blocks. */
- final int checkBlocksPerSegment;
- /** Total number of segments */
- final int segmentCount;
- /** The detailed information on each segment */
- final Segment[] segments;
- /** The splitfile data blocks. */
- final FreenetURI[] splitfileDataBlocks;
- /** The splitfile check blocks. */
- final FreenetURI[] splitfileCheckBlocks;
- /** The archive context */
- final ArchiveContext actx;
- /** The fetch context */
- final FetcherContext fctx;
- /** Maximum temporary length */
- final long maxTempLength;
- /** Have all segments finished? Access synchronized. */
- private boolean allSegmentsFinished = false;
- /** Currently fetching segment */
- private Segment fetchingSegment;
- /** Array of unstarted segments. Modify synchronized. */
- private Vector unstartedSegments;
- /** Override length. If this is positive, truncate the splitfile to
this length. */
- private long overrideLength;
- /** Accept non-full splitfile chunks? */
- private boolean splitUseLengths;
-
- public SplitFetcher(Metadata metadata, ArchiveContext archiveContext,
FetcherContext ctx, int recursionLevel) throws MetadataParseException,
FetchException {
- actx = archiveContext;
- fctx = ctx;
- if(fctx.isCancelled()) throw new
FetchException(FetchException.CANCELLED);
- overrideLength = metadata.dataLength;
- this.maxTempLength = ctx.maxTempLength;
- splitfileType = metadata.getSplitfileType();
- splitfileDataBlocks = metadata.getSplitfileDataKeys();
- splitfileCheckBlocks = metadata.getSplitfileCheckKeys();
- splitUseLengths = metadata.splitUseLengths;
- int blockLength = splitUseLengths ? -1 : NodeCHK.BLOCK_SIZE;
- if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT) {
- // Don't need to do much - just fetch everything and
piece it together.
- blocksPerSegment = -1;
- checkBlocksPerSegment = -1;
- segmentCount = 1;
- } else if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD) {
- byte[] params = metadata.splitfileParams;
- if(params == null || params.length < 8)
- throw new MetadataParseException("No splitfile
params");
- blocksPerSegment = Fields.bytesToInt(params, 0);
- checkBlocksPerSegment = Fields.bytesToInt(params, 4);
- if(blocksPerSegment > ctx.maxDataBlocksPerSegment
- || checkBlocksPerSegment >
ctx.maxCheckBlocksPerSegment)
- throw new
FetchException(FetchException.TOO_MANY_BLOCKS_PER_SEGMENT, "Too many blocks per
segment: "+blocksPerSegment+" data, "+checkBlocksPerSegment+" check");
- segmentCount = (splitfileDataBlocks.length /
blocksPerSegment) +
- (splitfileDataBlocks.length % blocksPerSegment
== 0 ? 0 : 1);
- // Onion, 128/192.
- // Will be segmented.
- } else throw new MetadataParseException("Unknown splitfile
format: "+splitfileType);
- Logger.minor(this, "Algorithm: "+splitfileType+", blocks per
segment: "+blocksPerSegment+", check blocks per segment:
"+checkBlocksPerSegment+", segments: "+segmentCount);
- segments = new Segment[segmentCount]; // initially null on all
entries
- if(segmentCount == 1) {
- segments[0] = new Segment(splitfileType,
splitfileDataBlocks, splitfileCheckBlocks, this, archiveContext, ctx,
maxTempLength, splitUseLengths, recursionLevel+1);
- } else {
- int dataBlocksPtr = 0;
- int checkBlocksPtr = 0;
- for(int i=0;i<segments.length;i++) {
- // Create a segment. Give it its keys.
- int copyDataBlocks =
Math.min(splitfileDataBlocks.length - dataBlocksPtr, blocksPerSegment);
- int copyCheckBlocks =
Math.min(splitfileCheckBlocks.length - checkBlocksPtr, checkBlocksPerSegment);
- FreenetURI[] dataBlocks = new
FreenetURI[copyDataBlocks];
- FreenetURI[] checkBlocks = new
FreenetURI[copyCheckBlocks];
- if(copyDataBlocks > 0)
- System.arraycopy(splitfileDataBlocks,
dataBlocksPtr, dataBlocks, 0, copyDataBlocks);
- if(copyCheckBlocks > 0)
- System.arraycopy(splitfileCheckBlocks,
checkBlocksPtr, checkBlocks, 0, copyCheckBlocks);
- dataBlocksPtr += copyDataBlocks;
- checkBlocksPtr += copyCheckBlocks;
- segments[i] = new Segment(splitfileType,
dataBlocks, checkBlocks, this, archiveContext, ctx, maxTempLength,
splitUseLengths, blockLength);
- }
- }
- unstartedSegments = new Vector();
- for(int i=0;i<segments.length;i++)
- unstartedSegments.add(segments[i]);
- Logger.minor(this, "Segments: "+unstartedSegments.size()+",
data keys: "+splitfileDataBlocks.length+", check keys:
"+(splitfileCheckBlocks==null?0:splitfileCheckBlocks.length));
- }
-
- /**
- * Fetch the splitfile.
- * Fetch one segment, while decoding the previous one.
- * Fetch the segments in random order.
- * When everything has been fetched and decoded, return the full data.
- * @throws FetchException
- */
- public Bucket fetch() throws FetchException {
- /*
- * While(true) {
- * Pick a random segment, start it fetching.
- * Wait for a segment to finish fetching, a segment to
finish decoding, or an error.
- * If a segment finishes fetching:
- * Continue to start another one if there are any
left
- * If a segment finishes decoding:
- * If all segments are decoded, assemble all the
segments and return the data.
- *
- * Segments are expected to automatically start decoding when
they finish fetching,
- * but to tell us either way.
- */
- while(true) {
- synchronized(this) {
- if(fctx.isCancelled()) throw new
FetchException(FetchException.CANCELLED);
- if(fetchingSegment == null) {
- // Pick a random segment
- fetchingSegment =
chooseUnstartedSegment();
- if(fetchingSegment == null) {
- // All segments have started
- } else {
- fetchingSegment.start();
- }
- }
- if(allSegmentsFinished) {
- return finalStatus();
- }
- try {
- wait(10*1000); // or wait()?
- } catch (InterruptedException e) {
- // Ignore
- }
- }
- }
- }
-
- private Segment chooseUnstartedSegment() {
- synchronized(unstartedSegments) {
- if(unstartedSegments.isEmpty()) return null;
- int x = fctx.random.nextInt(unstartedSegments.size());
- Logger.minor(this, "Starting segment "+x+" of
"+unstartedSegments.size());
- Segment s = (Segment) unstartedSegments.remove(x);
- return s;
- }
- }
-
- /** Return the final status of the fetch. Throws an exception, or
returns a
- * Bucket containing the fetched data.
- * @throws FetchException If the fetch failed for some reason.
- */
- private Bucket finalStatus() throws FetchException {
- long finalLength = 0;
- for(int i=0;i<segments.length;i++) {
- Segment s = segments[i];
- if(!s.isFinished()) throw new
IllegalStateException("Not all finished");
- s.throwError();
- // If still here, it succeeded
- finalLength += s.decodedLength();
- // Healing is done by Segment
- }
- if(finalLength > overrideLength)
- finalLength = overrideLength;
-
- long bytesWritten = 0;
- OutputStream os = null;
- Bucket output;
- try {
- output = fctx.bucketFactory.makeBucket(finalLength);
- os = output.getOutputStream();
- for(int i=0;i<segments.length;i++) {
- Segment s = segments[i];
- long max = (finalLength < 0 ? 0 : (finalLength
- bytesWritten));
- bytesWritten += s.writeDecodedDataTo(os, max);
- }
- } catch (IOException e) {
- throw new FetchException(FetchException.BUCKET_ERROR,
e);
- } finally {
- if(os != null) {
- try {
- os.close();
- } catch (IOException e) {
- // If it fails to close it may return
corrupt data.
- throw new
FetchException(FetchException.BUCKET_ERROR, e);
- }
- }
- }
- return output;
- }
-
- public void gotBlocks(Segment segment) {
- Logger.minor(this, "Got blocks for segment: "+segment);
- synchronized(this) {
- fetchingSegment = null;
- notifyAll();
- }
- }
-
- public void segmentFinished(Segment segment) {
- Logger.minor(this, "Finished segment: "+segment);
- synchronized(this) {
- boolean allDone = true;
- for(int i=0;i<segments.length;i++)
- if(!segments[i].isFinished()) {
- Logger.minor(this, "Segment
"+segments[i]+" is not finished");
- allDone = false;
- }
- if(allDone) allSegmentsFinished = true;
- notifyAll();
- }
- }
-
- public void onProgress() {
- int totalBlocks = splitfileDataBlocks.length;
- int fetchedBlocks = 0;
- int failedBlocks = 0;
- int fatallyFailedBlocks = 0;
- int runningBlocks = 0;
- for(int i=0;i<segments.length;i++) {
- Logger.minor(this, "Segment: "+segments[i]+":
fetched="+segments[i].fetchedBlocks()+", failedBlocks:
"+segments[i].failedBlocks()+
- ", fatally:
"+segments[i].fatallyFailedBlocks()+", running: "+segments[i].runningBlocks());
- fetchedBlocks += segments[i].fetchedBlocks();
- failedBlocks += segments[i].failedBlocks();
- fatallyFailedBlocks +=
segments[i].fatallyFailedBlocks();
- runningBlocks += segments[i].runningBlocks();
- }
- fctx.eventProducer.produceEvent(new
SplitfileProgressEvent(totalBlocks, fetchedBlocks, failedBlocks,
fatallyFailedBlocks, runningBlocks));
- }
-
-}
Deleted: trunk/freenet/src/freenet/client/SplitInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/SplitInserter.java 2006-01-25 23:39:26 UTC
(rev 7929)
+++ trunk/freenet/src/freenet/client/SplitInserter.java 2006-01-25 23:47:20 UTC
(rev 7930)
@@ -1,302 +0,0 @@
-package freenet.client;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Vector;
-
-import freenet.client.events.GeneratedURIEvent;
-import freenet.client.events.SplitfileProgressEvent;
-import freenet.keys.FreenetURI;
-import freenet.keys.NodeCHK;
-import freenet.support.Bucket;
-import freenet.support.BucketTools;
-import freenet.support.Logger;
-import freenet.support.compress.Compressor;
-
-/**
- * Insert a splitfile.
- */
-public class SplitInserter implements RetryTrackerCallback {
-
- final Bucket origData;
- final long dataLength;
- final ClientMetadata clientMetadata;
- final short compressionCodec;
- final short splitfileAlgorithm;
- final InserterContext ctx;
- final RetryTracker tracker;
- final int segmentSize;
- final int checkSegmentSize;
- final int blockSize;
- final boolean isMetadata;
- final Bucket returnMetadata;
- SplitfileBlock[] origDataBlocks;
- InsertSegment encodingSegment;
- InsertSegment[] segments;
- private boolean finishedInserting = false;
- private boolean getCHKOnly;
- private int succeeded;
- private int failed;
- private int fatalErrors;
- private int countCheckBlocks;
- private SplitfileBlock[] fatalErrorBlocks;
- private FileInserter inserter;
-
- /**
- * @param returnMetadata If not null, then write the metadata to this
bucket,
- * rather than inserting it.
- */
- public SplitInserter(Bucket data, ClientMetadata clientMetadata,
Compressor compressor, short splitfileAlgorithm, InserterContext ctx,
FileInserter inserter, int blockLength, boolean getCHKOnly, boolean isMetadata,
Bucket returnMetadata) throws InserterException {
- this.origData = data;
- this.getCHKOnly = getCHKOnly;
- this.blockSize = blockLength;
- this.clientMetadata = clientMetadata;
- if(compressor == null)
- compressionCodec = -1;
- else
- compressionCodec = compressor.codecNumberForMetadata();
- this.splitfileAlgorithm = splitfileAlgorithm;
- this.ctx = ctx;
- this.dataLength = data.size();
- segmentSize = ctx.splitfileSegmentDataBlocks;
- checkSegmentSize = splitfileAlgorithm ==
Metadata.SPLITFILE_NONREDUNDANT ? 0 : ctx.splitfileSegmentCheckBlocks;
- tracker = new RetryTracker(ctx.maxInsertRetries,
Integer.MAX_VALUE, ctx.random, ctx.maxSplitInsertThreads, true, this, true);
- try {
- splitIntoBlocks();
- } catch (IOException e) {
- throw new
InserterException(InserterException.BUCKET_ERROR, e, null);
- }
- this.inserter = inserter;
- this.isMetadata = isMetadata;
- this.returnMetadata = returnMetadata;
- }
-
- /**
- * Inserts the splitfile.
- * @return The URI of the resulting file.
- * @throws InserterException If we are not able to insert the splitfile.
- */
- public FreenetURI run() throws InserterException {
- try {
- startInsertingDataBlocks();
- splitIntoSegments(segmentSize);
- // Backwards, because the last is the shortest
- try {
- for(int i=segments.length-1;i>=0;i--) {
- encodeSegment(i, origDataBlocks.length
+ checkSegmentSize * i);
- Logger.minor(this, "Encoded segment
"+i+" of "+segments.length);
- }
- } catch (IOException e) {
- throw new
InserterException(InserterException.BUCKET_ERROR, e, null);
- }
- // Wait for the insertion thread to finish
- return waitForCompletion();
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t, t);
- tracker.kill();
- if(t instanceof InserterException) throw
(InserterException)t;
- throw new
InserterException(InserterException.INTERNAL_ERROR, t, null);
- }
- }
-
- private FreenetURI waitForCompletion() throws InserterException {
- tracker.setFinishOnEmpty();
- synchronized(this) {
- while(!finishedInserting) {
- try {
- wait(10*1000);
- } catch (InterruptedException e) {
- // Ignore
- }
- }
- }
-
- // Create the manifest (even if we failed, so that the key is
visible)
-
- FreenetURI[] dataURIs = getDataURIs();
- FreenetURI[] checkURIs = getCheckURIs();
-
- Logger.minor(this, "Data URIs: "+dataURIs.length+", check URIs:
"+checkURIs.length);
-
- boolean missingURIs = anyNulls(dataURIs) || anyNulls(checkURIs);
-
- if(missingURIs && fatalErrors == 0 && failed == 0)
- throw new IllegalStateException();
-
- FreenetURI uri = null;
-
- if(!missingURIs) {
-
- Metadata metadata = new Metadata(splitfileAlgorithm,
dataURIs, checkURIs, segmentSize, checkSegmentSize, clientMetadata, dataLength,
compressionCodec, isMetadata);
-
- if(returnMetadata != null) {
- DataOutputStream dos;
- try {
- dos = new
DataOutputStream(returnMetadata.getOutputStream());
- metadata.writeTo(dos);
- dos.close();
- } catch (IOException e) {
- throw new
InserterException(InserterException.BUCKET_ERROR);
- }
- } else {
-
- Bucket mbucket;
- try {
- mbucket =
BucketTools.makeImmutableBucket(ctx.bf, metadata.writeToByteArray());
- } catch (IOException e) {
- throw new
InserterException(InserterException.BUCKET_ERROR, null);
- }
-
- if(inserter == null)
- inserter = new FileInserter(ctx);
-
- InsertBlock mblock = new InsertBlock(mbucket,
null, FreenetURI.EMPTY_CHK_URI);
-
- // FIXME probably should uncomment below so it
doesn't get inserted at all?
- // FIXME this is a hack for small network
support... but we will need that IRL... hmmm
- try {
- uri = inserter.run(mblock, true,
getCHKOnly/* || (fatalErrors > 0 || failed > 0)*/, false, null);
- } catch (InserterException e) {
- e.errorCodes =
tracker.getAccumulatedNonFatalErrorCodes().merge(tracker.getAccumulatedFatalErrorCodes());
- throw e;
- }
- }
-
-
- }
- // Did we succeed?
-
- if(uri != null)
- ctx.eventProducer.produceEvent(new
GeneratedURIEvent(uri));
-
- if(fatalErrors > 0) {
- throw new
InserterException(InserterException.FATAL_ERRORS_IN_BLOCKS,
tracker.getAccumulatedFatalErrorCodes(), uri);
- }
-
- if(failed > 0) {
- throw new
InserterException(InserterException.TOO_MANY_RETRIES_IN_BLOCKS,
tracker.getAccumulatedNonFatalErrorCodes(), uri);
- }
-
- return uri;
- }
-
- // FIXME move this to somewhere
- private static boolean anyNulls(Object[] array) {
- for(int i=0;i<array.length;i++)
- if(array[i] == null) return true;
- return false;
- }
-
- private FreenetURI[] getCheckURIs() {
- // Copy check blocks from each segment into a FreenetURI[].
- FreenetURI[] uris = new FreenetURI[countCheckBlocks];
- int x = 0;
- for(int i=0;i<segments.length;i++) {
- FreenetURI[] segURIs = segments[i].getCheckURIs();
- if(x + segURIs.length > countCheckBlocks)
- throw new IllegalStateException("x="+x+",
segURIs="+segURIs.length+", countCheckBlocks="+countCheckBlocks);
- System.arraycopy(segURIs, 0, uris, x, segURIs.length);
- x += segURIs.length;
- }
-
- if(uris.length != x)
- throw new IllegalStateException("Total is wrong");
-
- return uris;
- }
-
- private FreenetURI[] getDataURIs() {
- FreenetURI[] uris = new FreenetURI[origDataBlocks.length];
- for(int i=0;i<uris.length;i++)
- uris[i] = origDataBlocks[i].getURI();
- return uris;
- }
-
- private int encodeSegment(int i, int offset) throws IOException {
- encodingSegment = segments[i];
- return encodingSegment.encode(offset, tracker, ctx);
- }
-
- /**
- * Start the insert, by adding all the data blocks.
- */
- private void startInsertingDataBlocks() {
- for(int i=0;i<origDataBlocks.length;i++)
- tracker.addBlock(origDataBlocks[i]);
- tracker.callOnProgress();
- }
-
- /**
- * Split blocks into segments for encoding.
- * @throws IOException If there is a bucket error encoding the file.
- */
- private void splitIntoBlocks() throws IOException {
- Bucket[] dataBuckets = BucketTools.split(origData,
NodeCHK.BLOCK_SIZE, ctx.bf);
- origDataBlocks = new SplitfileBlock[dataBuckets.length];
- for(int i=0;i<origDataBlocks.length;i++) {
- origDataBlocks[i] = new BlockInserter(dataBuckets[i],
i, tracker, ctx, getCHKOnly);
- if(origDataBlocks[i].getData() == null)
- throw new NullPointerException("Block "+i+" of
"+dataBuckets.length+" is null");
- }
- }
-
- /**
- * Group the blocks into segments.
- */
- private void splitIntoSegments(int segmentSize) {
- int dataBlocks = origDataBlocks.length;
-
- Vector segs = new Vector();
-
- // First split the data up
- if(dataBlocks < segmentSize || segmentSize == -1) {
- // Single segment
- InsertSegment onlySeg = new
InsertSegment(splitfileAlgorithm, origDataBlocks, blockSize, ctx.bf,
getCHKOnly, 0);
- countCheckBlocks = onlySeg.checkBlocks.length;
- segs.add(onlySeg);
- } else {
- int j = 0;
- int segNo = 0;
- for(int i=segmentSize;;i+=segmentSize) {
- if(i > dataBlocks) i = dataBlocks;
- SplitfileBlock[] seg = new SplitfileBlock[i-j];
- System.arraycopy(origDataBlocks, j, seg, 0,
i-j);
- j = i;
- for(int x=0;x<seg.length;x++)
- if(seg[x].getData() == null) throw new
NullPointerException("In splitIntoSegs: "+x+" is null of "+seg.length+" of
"+segNo);
- InsertSegment s = new
InsertSegment(splitfileAlgorithm, seg, blockSize, ctx.bf, getCHKOnly, segNo);
- countCheckBlocks += s.checkBlocks.length;
- segs.add(s);
-
- if(i == dataBlocks) break;
- segNo++;
- }
- }
- segments = (InsertSegment[]) segs.toArray(new
InsertSegment[segs.size()]);
- }
-
- public void finished(SplitfileBlock[] succeeded, SplitfileBlock[]
failed, SplitfileBlock[] fatalErrors) {
- synchronized(this) {
- finishedInserting = true;
- this.succeeded = succeeded.length;
- this.failed = failed.length;
- this.fatalErrorBlocks = fatalErrors;
- this.fatalErrors = fatalErrorBlocks.length;
- notify();
- }
- }
-
- public void onProgress() {
- /* What info to report?
- * - Total number of blocks to insert.
- * -
- */
- int totalBlocks = origDataBlocks.length + countCheckBlocks;
- int fetchedBlocks = tracker.succeededBlocks().length;
- int failedBlocks = tracker.countFailedBlocks();
- int fatallyFailedBlocks = tracker.fatalErrorBlocks().length;
- int runningBlocks = tracker.runningBlocks().length;
- ctx.eventProducer.produceEvent(new
SplitfileProgressEvent(totalBlocks, fetchedBlocks, failedBlocks,
fatallyFailedBlocks, runningBlocks));
- }
-
-}
Modified: trunk/freenet/src/freenet/client/SplitfileBlock.java
===================================================================
--- trunk/freenet/src/freenet/client/SplitfileBlock.java 2006-01-25
23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/SplitfileBlock.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -1,11 +1,9 @@
package freenet.client;
-import freenet.client.RetryTracker.Level;
import freenet.keys.FreenetURI;
import freenet.support.Bucket;
-/** Simple interface for a splitfile block */
-public abstract class SplitfileBlock {
+public interface SplitfileBlock {
/** Get block number. [0,k[ = data blocks, [k, n[ = check blocks */
abstract int getNumber();
@@ -19,20 +17,5 @@
/** Set data */
abstract void setData(Bucket data);
- /** Start the fetch (or insert). Implementation is required to call
relevant
- * methods on RetryTracker when done. */
- abstract void start();
- /**
- * Shut down the fetch as soon as reasonably possible.
- */
- abstract public void kill();
-
- /**
- * Get the URI of the file. For an insert, this is derived during
insert.
- * For a request, it is fixed in the constructor.
- */
- abstract public FreenetURI getURI();
-
- abstract public int getRetryCount();
}
Modified: trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2006-01-25
23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -8,7 +8,6 @@
import com.onionnetworks.fec.DefaultFECCodeFactory;
import com.onionnetworks.fec.FECCode;
-import com.onionnetworks.fec.PureCode;
import com.onionnetworks.util.Buffer;
import freenet.support.Bucket;
@@ -24,7 +23,7 @@
public class Encoder implements Runnable {
- private final SplitfileBlock[] dataBlockStatus,
checkBlockStatus;
+ private final Bucket[] dataBlockStatus, checkBlockStatus;
private final int blockLength;
private final BucketFactory bf;
private IOException thrownIOE;
@@ -32,7 +31,7 @@
private Error thrownError;
private boolean finished;
- public Encoder(SplitfileBlock[] dataBlockStatus,
SplitfileBlock[] checkBlockStatus, int blockLength, BucketFactory bf) {
+ public Encoder(Bucket[] dataBlockStatus, Bucket[]
checkBlockStatus, int blockLength, BucketFactory bf) {
this.dataBlockStatus = dataBlockStatus;
this.checkBlockStatus = checkBlockStatus;
this.blockLength = blockLength;
@@ -235,7 +234,7 @@
if (idx < k)
throw new IllegalArgumentException(
- "Must have at least k packets");
+ "Must have at least k packets
(k="+k+",idx="+idx+")");
for (int i = 0; i < packetIndexes.length; i++)
Logger.minor(this, "[" + i + "] = " +
packetIndexes[i]);
@@ -288,7 +287,7 @@
}
}
- public void encode(SplitfileBlock[] dataBlockStatus, SplitfileBlock[]
checkBlockStatus, int blockLength, BucketFactory bf) throws IOException {
+ public void encode(Bucket[] dataBlockStatus, Bucket[] checkBlockStatus,
int blockLength, BucketFactory bf) throws IOException {
// Encodes count as decodes.
synchronized(runningDecodesSync) {
while(runningDecodes >= PARALLEL_DECODES) {
@@ -324,12 +323,26 @@
}
}
}
+
+ public void encode(SplitfileBlock[] dataBlockStatus, SplitfileBlock[]
checkBlockStatus, int blockLength, BucketFactory bf) throws IOException {
+ Bucket[] dataBlocks = new Bucket[dataBlockStatus.length];
+ Bucket[] checkBlocks = new Bucket[checkBlockStatus.length];
+ for(int i=0;i<dataBlocks.length;i++)
+ dataBlocks[i] = dataBlockStatus[i].getData();
+ for(int i=0;i<checkBlocks.length;i++)
+ checkBlocks[i] = checkBlockStatus[i].getData();
+ encode(dataBlocks, checkBlocks, blockLength, bf);
+ for(int i=0;i<dataBlocks.length;i++)
+ dataBlockStatus[i].setData(dataBlocks[i]);
+ for(int i=0;i<checkBlocks.length;i++)
+ checkBlockStatus[i].setData(checkBlocks[i]);
+ }
/**
* Do the actual encode.
*/
- private void realEncode(SplitfileBlock[] dataBlockStatus,
- SplitfileBlock[] checkBlockStatus, int blockLength,
BucketFactory bf)
+ private void realEncode(Bucket[] dataBlockStatus,
+ Bucket[] checkBlockStatus, int blockLength,
BucketFactory bf)
throws IOException {
// Runtime.getRuntime().gc();
// Runtime.getRuntime().runFinalization();
@@ -366,7 +379,7 @@
STRIPE_SIZE);
for (int i = 0; i < dataBlockStatus.length; i++) {
- buckets[i] = dataBlockStatus[i].getData();
+ buckets[i] = dataBlockStatus[i];
long sz = buckets[i].size();
if (sz < blockLength) {
if (i != dataBlockStatus.length - 1)
@@ -382,7 +395,7 @@
}
for (int i = 0; i < checkBlockStatus.length; i++) {
- buckets[i + k] = checkBlockStatus[i].getData();
+ buckets[i + k] = checkBlockStatus[i];
if (buckets[i + k] == null) {
buckets[i + k] =
bf.makeBucket(blockLength);
writers[i] = buckets[i +
k].getOutputStream();
@@ -455,7 +468,7 @@
Bucket data = buckets[i + k];
if (data == null)
throw new NullPointerException();
- checkBlockStatus[i].setData(data);
+ checkBlockStatus[i] = data;
}
}
Copied: trunk/freenet/src/freenet/client/StartableSplitfileBlock.java (from rev
7929, branches/async-client/src/freenet/client/StartableSplitfileBlock.java)
Deleted: trunk/freenet/src/freenet/client/StdSplitfileBlock.java
===================================================================
--- trunk/freenet/src/freenet/client/StdSplitfileBlock.java 2006-01-25
23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/StdSplitfileBlock.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -1,54 +0,0 @@
-package freenet.client;
-
-import freenet.support.Bucket;
-import freenet.support.Logger;
-
-public abstract class StdSplitfileBlock extends SplitfileBlock implements
Runnable {
-
- Bucket fetchedData;
- protected final RetryTracker tracker;
- /** Splitfile index - [0,k[ is the data blocks, [k,n[ is the check
blocks */
- protected final int index;
-
- public StdSplitfileBlock(RetryTracker tracker2, int index2, Bucket
data) {
- if(tracker2 == null) throw new NullPointerException();
- this.tracker = tracker2;
- this.index = index2;
- this.fetchedData = data;
- }
-
- public int getNumber() {
- return index;
- }
-
- public boolean hasData() {
- return fetchedData != null;
- }
-
- public Bucket getData() {
- return fetchedData;
- }
-
- public void setData(Bucket data) {
- if(data == fetchedData) return;
- fetchedData = data;
- Logger.minor(this, "Set data: "+(data == null ? "(null)" :
(""+data.size())+ " on "+this), new Exception("debug"));
- }
-
- public void start() {
- checkStartable();
- Logger.minor(this, "Starting "+this);
- try {
- Thread t = new Thread(this, getName());
- t.setDaemon(true);
- t.start();
- } catch (Throwable error) {
- tracker.fatalError(this,
InserterException.INTERNAL_ERROR);
- Logger.error(this, "Caught "+error+" creating thread
for "+this);
- }
- }
-
- public abstract String getName();
-
- protected abstract void checkStartable();
-}
Copied: trunk/freenet/src/freenet/client/async (from rev 7929,
branches/async-client/src/freenet/client/async)
Deleted: trunk/freenet/src/freenet/client/async/BaseClientPutter.java
===================================================================
--- branches/async-client/src/freenet/client/async/BaseClientPutter.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/async/BaseClientPutter.java
2006-01-25 23:47:20 UTC (rev 7930)
@@ -1,9 +0,0 @@
-package freenet.client.async;
-
-public abstract class BaseClientPutter extends ClientRequest {
-
- protected BaseClientPutter(short priorityClass, ClientRequestScheduler
scheduler) {
- super(priorityClass, scheduler);
- }
-
-}
Copied: trunk/freenet/src/freenet/client/async/BaseClientPutter.java (from rev
7929, branches/async-client/src/freenet/client/async/BaseClientPutter.java)
Deleted: trunk/freenet/src/freenet/client/async/ClientCallback.java
===================================================================
--- branches/async-client/src/freenet/client/async/ClientCallback.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/async/ClientCallback.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -1,25 +0,0 @@
-package freenet.client.async;
-
-import freenet.client.FetchException;
-import freenet.client.FetchResult;
-import freenet.client.InserterException;
-import freenet.keys.FreenetURI;
-
-/**
- * A client process. Something that initiates requests, and can cancel
- * them. FCP, Fproxy, and the GlobalPersistentClient, implement this
- * somewhere.
- */
-public interface ClientCallback {
-
- public void onSuccess(FetchResult result, ClientGetter state);
-
- public void onFailure(FetchException e, ClientGetter state);
-
- public void onSuccess(BaseClientPutter state);
-
- public void onFailure(InserterException e, BaseClientPutter state);
-
- public void onGeneratedURI(FreenetURI uri, BaseClientPutter state);
-
-}
Copied: trunk/freenet/src/freenet/client/async/ClientCallback.java (from rev
7929, branches/async-client/src/freenet/client/async/ClientCallback.java)
Deleted: trunk/freenet/src/freenet/client/async/ClientGetState.java
===================================================================
--- branches/async-client/src/freenet/client/async/ClientGetState.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/async/ClientGetState.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -1,15 +0,0 @@
-package freenet.client.async;
-
-/**
- * A ClientGetState.
- * Represents a stage in the fetch process.
- */
-public abstract class ClientGetState {
-
- public abstract ClientGetter getParent();
-
- public abstract void schedule();
-
- public abstract void cancel();
-
-}
Copied: trunk/freenet/src/freenet/client/async/ClientGetState.java (from rev
7929, branches/async-client/src/freenet/client/async/ClientGetState.java)
Deleted: trunk/freenet/src/freenet/client/async/ClientGetter.java
===================================================================
--- branches/async-client/src/freenet/client/async/ClientGetter.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/async/ClientGetter.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -1,88 +0,0 @@
-package freenet.client.async;
-
-import java.net.MalformedURLException;
-
-import freenet.client.ArchiveContext;
-import freenet.client.ClientMetadata;
-import freenet.client.FetchException;
-import freenet.client.FetchResult;
-import freenet.client.FetcherContext;
-import freenet.keys.FreenetURI;
-
-/**
- * A high level data request.
- */
-public class ClientGetter extends ClientRequest implements
GetCompletionCallback {
-
- final ClientCallback client;
- final FreenetURI uri;
- final FetcherContext ctx;
- final ArchiveContext actx;
- ClientGetState currentState;
- private boolean finished;
- private int archiveRestarts;
-
- public ClientGetter(ClientCallback client, ClientRequestScheduler
sched, FreenetURI uri, FetcherContext ctx, short priorityClass) {
- super(priorityClass, sched);
- this.client = client;
- this.uri = uri;
- this.ctx = ctx;
- this.finished = false;
- this.actx = new ArchiveContext();
- archiveRestarts = 0;
- }
-
- public void start() throws FetchException {
- try {
- currentState = new SingleFileFetcher(this, this, new
ClientMetadata(), uri, ctx, actx, getPriorityClass(), 0, false, null);
- currentState.schedule();
- } catch (MalformedURLException e) {
- throw new FetchException(FetchException.INVALID_URI, e);
- }
- }
-
- public void onSuccess(FetchResult result, ClientGetState state) {
- finished = true;
- currentState = null;
- client.onSuccess(result, this);
- }
-
- public void onFailure(FetchException e, ClientGetState state) {
- while(true) {
- if(e.mode == FetchException.ARCHIVE_RESTART) {
- archiveRestarts++;
- if(archiveRestarts > ctx.maxArchiveRestarts)
- e = new
FetchException(FetchException.TOO_MANY_ARCHIVE_RESTARTS);
- else {
- try {
- start();
- } catch (FetchException e1) {
- e = e1;
- continue;
- }
- return;
- }
- }
- finished = true;
- client.onFailure(e, this);
- return;
- }
- }
-
- public void cancel() {
- synchronized(this) {
- super.cancel();
- if(currentState != null)
- currentState.cancel();
- }
- }
-
- public boolean isFinished() {
- return finished || cancelled;
- }
-
- public FreenetURI getURI() {
- return uri;
- }
-
-}
Copied: trunk/freenet/src/freenet/client/async/ClientGetter.java (from rev
7929, branches/async-client/src/freenet/client/async/ClientGetter.java)
Deleted: trunk/freenet/src/freenet/client/async/ClientPutState.java
===================================================================
--- branches/async-client/src/freenet/client/async/ClientPutState.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/async/ClientPutState.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -1,14 +0,0 @@
-package freenet.client.async;
-
-/**
- * ClientPutState
- *
- * Represents a state in the insert process.
- */
-public interface ClientPutState {
-
- public abstract BaseClientPutter getParent();
-
- public abstract void cancel();
-
-}
Copied: trunk/freenet/src/freenet/client/async/ClientPutState.java (from rev
7929, branches/async-client/src/freenet/client/async/ClientPutState.java)
Deleted: trunk/freenet/src/freenet/client/async/ClientPutter.java
===================================================================
--- branches/async-client/src/freenet/client/async/ClientPutter.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/async/ClientPutter.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -1,95 +0,0 @@
-package freenet.client.async;
-
-import freenet.client.ClientMetadata;
-import freenet.client.InsertBlock;
-import freenet.client.InserterContext;
-import freenet.client.InserterException;
-import freenet.client.Metadata;
-import freenet.keys.ClientKey;
-import freenet.keys.FreenetURI;
-import freenet.support.Bucket;
-import freenet.support.Logger;
-
-public class ClientPutter extends BaseClientPutter implements
PutCompletionCallback {
-
- final ClientCallback client;
- final Bucket data;
- final FreenetURI targetURI;
- final ClientMetadata cm;
- final InserterContext ctx;
- private ClientPutState currentState;
- private boolean finished;
- private final boolean getCHKOnly;
- private final boolean isMetadata;
- private FreenetURI uri;
-
- public ClientPutter(ClientCallback client, Bucket data, FreenetURI
targetURI, ClientMetadata cm, InserterContext ctx,
- ClientRequestScheduler scheduler, short priorityClass,
boolean getCHKOnly, boolean isMetadata) {
- super(priorityClass, scheduler);
- this.cm = cm;
- this.isMetadata = isMetadata;
- this.getCHKOnly = getCHKOnly;
- this.client = client;
- this.data = data;
- this.targetURI = targetURI;
- this.ctx = ctx;
- this.finished = false;
- this.cancelled = false;
- }
-
- public void start() throws InserterException {
- try {
- currentState =
- new SingleFileInserter(this, this, new
InsertBlock(data, cm, targetURI), isMetadata, ctx, false, false, getCHKOnly,
false);
- ((SingleFileInserter)currentState).start();
- } catch (InserterException e) {
- finished = true;
- currentState = null;
- }
- }
-
- public void onSuccess(ClientPutState state) {
- finished = true;
- currentState = null;
- client.onSuccess(this);
- }
-
- public void onFailure(InserterException e, ClientPutState state) {
- finished = true;
- currentState = null;
- client.onFailure(e, this);
- }
-
- public void onEncode(ClientKey key, ClientPutState state) {
- this.uri = key.getURI();
- client.onGeneratedURI(uri, this);
- }
-
- public void cancel() {
- synchronized(this) {
- super.cancel();
- if(currentState != null)
- currentState.cancel();
- }
- }
-
- public boolean isFinished() {
- return finished || cancelled;
- }
-
- public FreenetURI getURI() {
- return uri;
- }
-
- public synchronized void onTransition(ClientPutState oldState,
ClientPutState newState) {
- if(currentState == oldState)
- currentState = newState;
- else
- Logger.error(this, "onTransition: cur="+currentState+",
old="+oldState+", new="+newState);
- }
-
- public void onMetadata(Metadata m, ClientPutState state) {
- Logger.error(this, "Got metadata on "+this+" from "+state+"
(this means the metadata won't be inserted)");
- }
-
-}
Copied: trunk/freenet/src/freenet/client/async/ClientPutter.java (from rev
7929, branches/async-client/src/freenet/client/async/ClientPutter.java)
Deleted: trunk/freenet/src/freenet/client/async/ClientRequest.java
===================================================================
--- branches/async-client/src/freenet/client/async/ClientRequest.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/async/ClientRequest.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -1,37 +0,0 @@
-package freenet.client.async;
-
-import freenet.keys.FreenetURI;
-
-/** A high level client request. A request (either fetch or put) started
- * by a Client. Has a suitable context and a URI; is fulfilled only when
- * we have followed all the redirects etc, or have an error. Can be
- * retried.
- */
-public abstract class ClientRequest {
-
- // FIXME move the priority classes from RequestStarter here
- protected short priorityClass;
- protected boolean cancelled;
- final ClientRequestScheduler scheduler;
-
- public short getPriorityClass() {
- return priorityClass;
- }
-
- protected ClientRequest(short priorityClass, ClientRequestScheduler
scheduler) {
- this.priorityClass = priorityClass;
- this.scheduler = scheduler;
- }
-
- public void cancel() {
- cancelled = true;
- }
-
- public boolean isCancelled() {
- return cancelled;
- }
-
- public abstract FreenetURI getURI();
-
- public abstract boolean isFinished();
-}
Copied: trunk/freenet/src/freenet/client/async/ClientRequest.java (from rev
7929, branches/async-client/src/freenet/client/async/ClientRequest.java)
Deleted: trunk/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/async-client/src/freenet/client/async/ClientRequestScheduler.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/async/ClientRequestScheduler.java
2006-01-25 23:47:20 UTC (rev 7930)
@@ -1,143 +0,0 @@
-package freenet.client.async;
-
-import freenet.crypt.RandomSource;
-import freenet.keys.ClientKeyBlock;
-import freenet.keys.KeyVerifyException;
-import freenet.node.LowLevelGetException;
-import freenet.node.Node;
-import freenet.node.RequestStarter;
-import freenet.support.Logger;
-import freenet.support.RandomGrabArrayWithInt;
-import freenet.support.SortedVectorByNumber;
-
-/**
- * Every X seconds, the RequestSender calls the ClientRequestScheduler to
- * ask for a request to start. A request is then started, in its own
- * thread. It is removed at that point.
- */
-public class ClientRequestScheduler implements RequestScheduler {
-
- /**
- * Structure:
- * array (by priority) -> // one element per possible priority
- * SortedVectorByNumber (by # retries) -> // contains each current
#retries
- * RandomGrabArray // contains each element, allows fast
fetch-and-drop-a-random-element
- *
- * To speed up fetching, a RGA or SVBN must only exist if it is
non-empty.
- */
- final SortedVectorByNumber[] priorities;
- // we have one for inserts and one for requests
- final boolean isInsertScheduler;
- final RandomSource random;
- private final RequestStarter starter;
- private final Node node;
-
- public ClientRequestScheduler(boolean forInserts, RandomSource random,
RequestStarter starter, Node node) {
- this.starter = starter;
- this.random = random;
- this.node = node;
- this.isInsertScheduler = forInserts;
- priorities = new
SortedVectorByNumber[RequestStarter.NUMBER_OF_PRIORITY_CLASSES];
- }
-
- public void register(SendableRequest req) {
- Logger.minor(this, "Registering "+req, new Exception("debug"));
- if((!isInsertScheduler) && req instanceof ClientPutter)
- throw new IllegalArgumentException("Expected a
ClientPut: "+req);
- if(req instanceof SendableGet) {
- SendableGet getter = (SendableGet)req;
- ClientKeyBlock block;
- try {
- block = node.fetchKey(getter.getKey());
- } catch (KeyVerifyException e) {
- // Verify exception, probably bogus at source;
- // verifies at low-level, but not at decode.
- getter.onFailure(new
LowLevelGetException(LowLevelGetException.DECODE_FAILED));
- return;
- }
- if(block != null) {
- getter.onSuccess(block);
- return;
- }
- }
- synchronized(this) {
- RandomGrabArrayWithInt grabber =
- makeGrabArray(req.getPriorityClass(),
req.getRetryCount());
- grabber.add(req);
- Logger.minor(this, "Registered "+req+" on
prioclass="+req.getPriorityClass()+", retrycount="+req.getRetryCount());
- }
- synchronized(starter) {
- starter.notifyAll();
- }
- }
-
- private synchronized RandomGrabArrayWithInt makeGrabArray(short
priorityClass, int retryCount) {
- SortedVectorByNumber prio = priorities[priorityClass];
- if(prio == null) {
- prio = new SortedVectorByNumber();
- priorities[priorityClass] = prio;
- }
- RandomGrabArrayWithInt grabber = (RandomGrabArrayWithInt)
prio.get(retryCount);
- if(grabber == null) {
- grabber = new RandomGrabArrayWithInt(random,
retryCount);
- prio.add(grabber);
- Logger.minor(this, "Registering retry count
"+retryCount+" with prioclass "+priorityClass);
- }
- return grabber;
- }
-
- /**
- * Should not be called often as can be slow if there are many requests
of the same
- * priority and retry count. Priority and retry count must be the same
as they were
- * when it was added.
- */
- public synchronized void remove(SendableRequest req) {
- // Should not be called often.
- int prio = req.getPriorityClass();
- int retryCount = req.getRetryCount();
- SortedVectorByNumber s = priorities[prio];
- if(s == null) return;
- if(s.isEmpty()) return;
- RandomGrabArrayWithInt grabber =
- (RandomGrabArrayWithInt) s.get(retryCount);
- if(grabber == null) return;
- grabber.remove(req);
- if(grabber.isEmpty()) {
- s.remove(retryCount);
- if(s.isEmpty())
- priorities[prio] = null;
- }
- }
-
- public synchronized SendableRequest removeFirst() {
- // Priorities start at 0
- for(int i=0;i<RequestStarter.MINIMUM_PRIORITY_CLASS;i++) {
- SortedVectorByNumber s = priorities[i];
- if(s == null) {
- Logger.minor(this, "Priority "+i+" is null");
- continue;
- }
- RandomGrabArrayWithInt rga = (RandomGrabArrayWithInt)
s.getFirst(); // will discard finished items
- if(rga == null) {
- Logger.minor(this, "No retrycount's in priority
"+i);
- priorities[i] = null;
- continue;
- }
- SendableRequest req = (SendableRequest)
rga.removeRandom();
- if(rga.isEmpty()) {
- s.remove(rga.getNumber());
- if(s.isEmpty()) {
- priorities[i] = null;
- }
- }
- if(req == null) {
- Logger.minor(this, "No requests in priority
"+i+", retrycount "+rga.getNumber());
- continue;
- }
- Logger.minor(this, "removeFirst() returning "+req);
- return req;
- }
- Logger.minor(this, "No requests to run");
- return null;
- }
-}
Copied: trunk/freenet/src/freenet/client/async/ClientRequestScheduler.java
(from rev 7929,
branches/async-client/src/freenet/client/async/ClientRequestScheduler.java)
Deleted: trunk/freenet/src/freenet/client/async/GetCompletionCallback.java
===================================================================
--- branches/async-client/src/freenet/client/async/GetCompletionCallback.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/async/GetCompletionCallback.java
2006-01-25 23:47:20 UTC (rev 7930)
@@ -1,16 +0,0 @@
-package freenet.client.async;
-
-import freenet.client.FetchException;
-import freenet.client.FetchResult;
-
-/**
- * Callback called when part of a get request completes - either with a
- * Bucket full of data, or with an error.
- */
-public interface GetCompletionCallback {
-
- public void onSuccess(FetchResult result, ClientGetState state);
-
- public void onFailure(FetchException e, ClientGetState state);
-
-}
Copied: trunk/freenet/src/freenet/client/async/GetCompletionCallback.java (from
rev 7929,
branches/async-client/src/freenet/client/async/GetCompletionCallback.java)
Deleted: trunk/freenet/src/freenet/client/async/MinimalSplitfileBlock.java
===================================================================
--- branches/async-client/src/freenet/client/async/MinimalSplitfileBlock.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/async/MinimalSplitfileBlock.java
2006-01-25 23:47:20 UTC (rev 7930)
@@ -1,32 +0,0 @@
-package freenet.client.async;
-
-import freenet.client.SplitfileBlock;
-import freenet.keys.FreenetURI;
-import freenet.support.Bucket;
-
-public class MinimalSplitfileBlock implements SplitfileBlock {
-
- public final int number;
- Bucket data;
-
- public MinimalSplitfileBlock(int n) {
- this.number = n;
- }
-
- public int getNumber() {
- return number;
- }
-
- public boolean hasData() {
- return data != null;
- }
-
- public Bucket getData() {
- return data;
- }
-
- public void setData(Bucket data) {
- this.data = data;
- }
-
-}
Copied: trunk/freenet/src/freenet/client/async/MinimalSplitfileBlock.java (from
rev 7929,
branches/async-client/src/freenet/client/async/MinimalSplitfileBlock.java)
Deleted: trunk/freenet/src/freenet/client/async/MultiPutCompletionCallback.java
===================================================================
---
branches/async-client/src/freenet/client/async/MultiPutCompletionCallback.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/async/MultiPutCompletionCallback.java
2006-01-25 23:47:20 UTC (rev 7930)
@@ -1,106 +0,0 @@
-package freenet.client.async;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.ListIterator;
-
-import freenet.client.InserterException;
-import freenet.client.Metadata;
-import freenet.keys.ClientKey;
-import freenet.support.Logger;
-
-public class MultiPutCompletionCallback implements PutCompletionCallback,
ClientPutState {
-
- private final LinkedList waitingFor;
- private final PutCompletionCallback cb;
- private ClientPutState generator;
- private final BaseClientPutter parent;
- private boolean finished;
- private boolean started;
-
- public MultiPutCompletionCallback(PutCompletionCallback cb,
BaseClientPutter parent) {
- this.cb = cb;
- this.waitingFor = new LinkedList();
- this.parent = parent;
- finished = false;
- }
-
- public synchronized void onSuccess(ClientPutState state) {
- if(finished) return;
- waitingFor.remove(state);
- if(waitingFor.isEmpty() && started) {
- complete(null);
- }
- }
-
- public synchronized void onFailure(InserterException e, ClientPutState
state) {
- if(finished) return;
- waitingFor.remove(state);
- if(waitingFor.isEmpty()) {
- complete(e);
- }
- }
-
- private synchronized void complete(InserterException e) {
- finished = true;
- if(e != null)
- cb.onFailure(e, this);
- else
- cb.onSuccess(this);
- }
-
- public synchronized void addURIGenerator(ClientPutState ps) {
- add(ps);
- generator = ps;
- }
-
- public synchronized void add(ClientPutState ps) {
- if(finished) return;
- waitingFor.add(ps);
- }
-
- public synchronized void arm() {
- started = true;
- }
-
- public BaseClientPutter getParent() {
- return parent;
- }
-
- public void onEncode(ClientKey key, ClientPutState state) {
- synchronized(this) {
- if(state != generator) return;
- }
- cb.onEncode(key, this);
- }
-
- public void cancel() {
- ClientPutState[] states = new ClientPutState[waitingFor.size()];
- synchronized(this) {
- states = (ClientPutState[]) waitingFor.toArray(states);
- }
- for(int i=0;i<states.length;i++)
- states[i].cancel();
- }
-
- public synchronized void onTransition(ClientPutState oldState,
ClientPutState newState) {
- if(generator == oldState)
- generator = newState;
- if(oldState == newState) return;
- for(ListIterator i = waitingFor.listIterator(0);i.hasNext();) {
- if(i.next() == oldState) {
- i.remove();
- i.add(newState);
- }
- }
- }
-
- public void onMetadata(Metadata m, ClientPutState state) {
- if(generator == state) {
- cb.onMetadata(m, this);
- } else {
- Logger.error(this, "Got metadata for "+state);
- }
- }
-
-}
Copied: trunk/freenet/src/freenet/client/async/MultiPutCompletionCallback.java
(from rev 7929,
branches/async-client/src/freenet/client/async/MultiPutCompletionCallback.java)
Deleted: trunk/freenet/src/freenet/client/async/PutCompletionCallback.java
===================================================================
--- branches/async-client/src/freenet/client/async/PutCompletionCallback.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/async/PutCompletionCallback.java
2006-01-25 23:47:20 UTC (rev 7930)
@@ -1,26 +0,0 @@
-package freenet.client.async;
-
-import freenet.client.InserterException;
-import freenet.client.Metadata;
-import freenet.keys.ClientKey;
-
-/**
- * Callback called when part of a put request completes.
- */
-public interface PutCompletionCallback {
-
- public void onSuccess(ClientPutState state);
-
- public void onFailure(InserterException e, ClientPutState state);
-
- public void onEncode(ClientKey key, ClientPutState state);
-
- public void onTransition(ClientPutState oldState, ClientPutState
newState);
-
- /** Only called if explicitly asked for, in which case, generally
- * the metadata won't be inserted. Won't be called if there isn't
- * any!
- */
- public void onMetadata(Metadata m, ClientPutState state);
-
-}
Copied: trunk/freenet/src/freenet/client/async/PutCompletionCallback.java (from
rev 7929,
branches/async-client/src/freenet/client/async/PutCompletionCallback.java)
Deleted: trunk/freenet/src/freenet/client/async/RequestScheduler.java
===================================================================
--- branches/async-client/src/freenet/client/async/RequestScheduler.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/async/RequestScheduler.java
2006-01-25 23:47:20 UTC (rev 7930)
@@ -1,7 +0,0 @@
-package freenet.client.async;
-
-public interface RequestScheduler {
-
- public SendableRequest removeFirst();
-
-}
Copied: trunk/freenet/src/freenet/client/async/RequestScheduler.java (from rev
7929, branches/async-client/src/freenet/client/async/RequestScheduler.java)
Deleted: trunk/freenet/src/freenet/client/async/SendableGet.java
===================================================================
--- branches/async-client/src/freenet/client/async/SendableGet.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/async/SendableGet.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -1,20 +0,0 @@
-package freenet.client.async;
-
-import freenet.keys.ClientKey;
-import freenet.keys.ClientKeyBlock;
-import freenet.node.LowLevelGetException;
-
-/**
- * A low-level key fetch which can be sent immediately. @see SendableRequest
- */
-public interface SendableGet extends SendableRequest {
-
- public ClientKey getKey();
-
- /** Called when/if the low-level request succeeds. */
- public void onSuccess(ClientKeyBlock block);
-
- /** Called when/if the low-level request fails. */
- public void onFailure(LowLevelGetException e);
-
-}
Copied: trunk/freenet/src/freenet/client/async/SendableGet.java (from rev 7929,
branches/async-client/src/freenet/client/async/SendableGet.java)
Deleted: trunk/freenet/src/freenet/client/async/SendableInsert.java
===================================================================
--- branches/async-client/src/freenet/client/async/SendableInsert.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/async/SendableInsert.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -1,26 +0,0 @@
-package freenet.client.async;
-
-import freenet.keys.ClientKeyBlock;
-import freenet.node.LowLevelPutException;
-
-/**
- * Callback interface for a low level insert, which is immediately sendable.
These
- * should be registered on the ClientRequestScheduler when we want to send
them. It will
- * then, when it is time to send, create a thread, send the request, and call
the
- * callback below.
- */
-public interface SendableInsert extends SendableRequest {
-
- /** Get the ClientKeyBlock to insert. This may be created
- * just-in-time, and may return null; ClientRequestScheduler
- * will simply unregister the SendableInsert if this happens.
- */
- public ClientKeyBlock getBlock();
-
- /** Called when we successfully insert the data */
- public void onSuccess();
-
- /** Called when we don't! */
- public void onFailure(LowLevelPutException e);
-
-}
Copied: trunk/freenet/src/freenet/client/async/SendableInsert.java (from rev
7929, branches/async-client/src/freenet/client/async/SendableInsert.java)
Deleted: trunk/freenet/src/freenet/client/async/SendableRequest.java
===================================================================
--- branches/async-client/src/freenet/client/async/SendableRequest.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/async/SendableRequest.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -1,19 +0,0 @@
-package freenet.client.async;
-
-import freenet.node.Node;
-import freenet.support.RandomGrabArrayItem;
-
-/**
- * A low-level request which can be sent immediately. These are registered
- * on the ClientRequestScheduler.
- */
-public interface SendableRequest extends RandomGrabArrayItem {
-
- public short getPriorityClass();
-
- public int getRetryCount();
-
- /** ONLY called by RequestStarter */
- public void send(Node node);
-
-}
Copied: trunk/freenet/src/freenet/client/async/SendableRequest.java (from rev
7929, branches/async-client/src/freenet/client/async/SendableRequest.java)
Deleted: trunk/freenet/src/freenet/client/async/SimpleManifestPutter.java
===================================================================
--- branches/async-client/src/freenet/client/async/SimpleManifestPutter.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/async/SimpleManifestPutter.java
2006-01-25 23:47:20 UTC (rev 7930)
@@ -1,291 +0,0 @@
-package freenet.client.async;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-
-import freenet.client.ClientMetadata;
-import freenet.client.DefaultMIMETypes;
-import freenet.client.InsertBlock;
-import freenet.client.InserterContext;
-import freenet.client.InserterException;
-import freenet.client.Metadata;
-import freenet.keys.ClientKey;
-import freenet.keys.FreenetURI;
-import freenet.support.Bucket;
-import freenet.support.BucketTools;
-import freenet.support.Logger;
-
-public class SimpleManifestPutter extends BaseClientPutter implements
PutCompletionCallback {
- // Only implements PutCompletionCallback for the final metadata insert
-
- private class PutHandler extends BaseClientPutter implements
PutCompletionCallback {
-
- protected PutHandler(String name, Bucket data, ClientMetadata
cm, boolean getCHKOnly) throws InserterException {
- super(SimpleManifestPutter.this.getPriorityClass(),
SimpleManifestPutter.this.scheduler);
- this.name = name;
- this.cm = cm;
- InsertBlock block =
- new InsertBlock(data, cm,
FreenetURI.EMPTY_CHK_URI);
- this.origSFI =
- new SingleFileInserter(this, this, block,
false, ctx, false, true, getCHKOnly, false);
- currentState = origSFI;
- metadata = null;
- }
-
- private SingleFileInserter origSFI;
- private ClientPutState currentState;
- private ClientMetadata cm;
- private final String name;
- private byte[] metadata;
- private boolean finished;
-
- public void start() throws InserterException {
- origSFI.start();
- origSFI = null;
- }
-
- public FreenetURI getURI() {
- return null;
- }
-
- public boolean isFinished() {
- return finished || cancelled ||
SimpleManifestPutter.this.cancelled;
- }
-
- public void onSuccess(ClientPutState state) {
- Logger.minor(this, "Completed "+this);
- synchronized(SimpleManifestPutter.this) {
- runningPutHandlers.remove(this);
- if(!runningPutHandlers.isEmpty()) {
- return;
- }
- }
- insertedAllFiles();
- }
-
- public void onFailure(InserterException e, ClientPutState
state) {
- Logger.minor(this, "Failed: "+this+" - "+e, e);
- fail(e);
- }
-
- public void onEncode(ClientKey key, ClientPutState state) {
- if(metadata == null) {
- // Don't have metadata yet
- // Do have key
- // So make a redirect to the key
- Metadata m =
- new Metadata(Metadata.SIMPLE_REDIRECT,
key.getURI(), cm);
- onMetadata(m, null);
- }
- }
-
- public void onTransition(ClientPutState oldState,
ClientPutState newState) {
- if(oldState == this) {
- // We do not need to have a hashtable of state
-> PutHandler.
- // Because we can just pull the parent off the
state!
- this.currentState = newState;
- }
- }
-
- public void onMetadata(Metadata m, ClientPutState state) {
- if(metadata != null) {
- Logger.error(this, "Reassigning metadata", new
Exception("debug"));
- return;
- }
- metadata = m.writeToByteArray();
- synchronized(SimpleManifestPutter.this) {
- putHandlersWaitingForMetadata.remove(this);
- if(!putHandlersWaitingForMetadata.isEmpty())
return;
- gotAllMetadata();
- }
- }
- }
-
- private final HashMap putHandlersByName;
- private final HashSet runningPutHandlers;
- private final HashSet putHandlersWaitingForMetadata;
- private FreenetURI finalURI;
- private FreenetURI targetURI;
- private boolean finished;
- private final InserterContext ctx;
- private final ClientCallback cb;
- private final boolean getCHKOnly;
- private boolean insertedAllFiles;
- private boolean insertedManifest;
- private ClientPutState currentMetadataInserterState;
- private final String defaultName;
- private final static String[] defaultDefaultNames =
- new String[] { "index.html", "index.htm", "default.html",
"default.htm" };
-
- public SimpleManifestPutter(ClientCallback cb, ClientRequestScheduler
sched,
- HashMap bucketsByName, short prioClass, FreenetURI
target,
- String defaultName, InserterContext ctx, boolean
getCHKOnly) throws InserterException {
- super(prioClass, sched);
- this.defaultName = defaultName;
- this.targetURI = target;
- this.cb = cb;
- this.ctx = ctx;
- this.getCHKOnly = getCHKOnly;
- putHandlersByName = new HashMap();
- runningPutHandlers = new HashSet();
- putHandlersWaitingForMetadata = new HashSet();
- Iterator it = bucketsByName.keySet().iterator();
- while(it.hasNext()) {
- String name = (String) it.next();
- Bucket data = (Bucket) bucketsByName.get(name);
- String mimeType = DefaultMIMETypes.guessMIMEType(name);
- ClientMetadata cm;
- if(mimeType.equals(DefaultMIMETypes.DEFAULT_MIME_TYPE))
- cm = null;
- else
- cm = new ClientMetadata(mimeType);
- PutHandler ph;
- try {
- ph = new PutHandler(name, data, cm, getCHKOnly);
- } catch (InserterException e) {
- cancelAndFinish();
- throw e;
- }
- putHandlersByName.put(name, ph);
- runningPutHandlers.add(ph);
- putHandlersWaitingForMetadata.add(ph);
- }
- it = putHandlersByName.values().iterator();
- while(it.hasNext()) {
- PutHandler ph = (PutHandler) it.next();
- try {
- ph.start();
- } catch (InserterException e) {
- cancelAndFinish();
- throw e;
- }
- }
- }
-
- public FreenetURI getURI() {
- return finalURI;
- }
-
- public boolean isFinished() {
- return finished || cancelled;
- }
-
- private void gotAllMetadata() {
- Logger.minor(this, "Got all metadata");
- HashMap namesToByteArrays = new HashMap();
- Iterator i = putHandlersByName.values().iterator();
- while(i.hasNext()) {
- PutHandler ph = (PutHandler) i.next();
- String name = ph.name;
- byte[] meta = ph.metadata;
- namesToByteArrays.put(name, meta);
- }
- if(defaultName != null) {
- byte[] meta = (byte[])
namesToByteArrays.get(defaultName);
- if(meta == null) {
- fail(new
InserterException(InserterException.INVALID_URI, "Default name "+defaultName+"
does not exist", null));
- return;
- }
- namesToByteArrays.put("", meta);
- } else {
- for(int j=0;j<defaultDefaultNames.length;j++) {
- String name = defaultDefaultNames[j];
- byte[] meta = (byte[])
namesToByteArrays.get(name);
- if(meta != null) {
- namesToByteArrays.put("", meta);
- break;
- }
- }
- }
- Metadata meta =
-
Metadata.mkRedirectionManifestWithMetadata(namesToByteArrays);
- Bucket bucket;
- try {
- bucket = BucketTools.makeImmutableBucket(ctx.bf,
meta.writeToByteArray());
- } catch (IOException e) {
- fail(new
InserterException(InserterException.BUCKET_ERROR, e, null));
- return;
- }
- InsertBlock block =
- new InsertBlock(bucket, null, targetURI);
- try {
- SingleFileInserter metadataInserter =
- new SingleFileInserter(this, this, block, true,
ctx, false, false, getCHKOnly, false);
- this.currentMetadataInserterState = metadataInserter;
- metadataInserter.start();
- } catch (InserterException e) {
- fail(e);
- }
- }
-
- private void insertedAllFiles() {
- synchronized(this) {
- insertedAllFiles = true;
- if(finished || cancelled) return;
- if(!insertedManifest) return;
- finished = true;
- }
- complete();
- }
-
- private void complete() {
- cb.onSuccess(this);
- }
-
- private void fail(InserterException e) {
- // Cancel all, then call the callback
- cancelAndFinish();
-
- cb.onFailure(e, this);
- }
-
- private void cancelAndFinish() {
- PutHandler[] running;
- synchronized(this) {
- if(finished) return;
- running = (PutHandler[]) runningPutHandlers.toArray(new
PutHandler[runningPutHandlers.size()]);
- finished = true;
- }
-
- for(int i=0;i<running.length;i++) {
- running[i].cancel();
- }
- }
-
- public void onSuccess(ClientPutState state) {
- synchronized(this) {
- insertedManifest = true;
- if(!finished) return;
- if(!insertedAllFiles) return;
- finished = true;
- }
- complete();
- }
-
- public void onFailure(InserterException e, ClientPutState state) {
- // FIXME check state == currentMetadataInserterState ??
- fail(e);
- }
-
- public void onEncode(ClientKey key, ClientPutState state) {
- this.finalURI = key.getURI();
- Logger.minor(this, "Got metadata key: "+finalURI);
- cb.onGeneratedURI(finalURI, this);
- }
-
- public void onTransition(ClientPutState oldState, ClientPutState
newState) {
- if(oldState == currentMetadataInserterState)
- currentMetadataInserterState = newState;
- else
- Logger.error(this, "Current state =
"+currentMetadataInserterState+" called onTransition(old="+oldState+",
new="+newState+")",
- new Exception("debug"));
- }
-
- public void onMetadata(Metadata m, ClientPutState state) {
- Logger.error(this, "Got metadata from "+state+" on "+this+"
(metadata inserter = "+currentMetadataInserterState);
- fail(new InserterException(InserterException.INTERNAL_ERROR));
- }
-
-}
Copied: trunk/freenet/src/freenet/client/async/SimpleManifestPutter.java (from
rev 7929,
branches/async-client/src/freenet/client/async/SimpleManifestPutter.java)
Deleted: trunk/freenet/src/freenet/client/async/SingleBlockInserter.java
===================================================================
--- branches/async-client/src/freenet/client/async/SingleBlockInserter.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/async/SingleBlockInserter.java
2006-01-25 23:47:20 UTC (rev 7930)
@@ -1,220 +0,0 @@
-package freenet.client.async;
-
-import java.io.IOException;
-import java.lang.ref.SoftReference;
-import java.net.MalformedURLException;
-
-import freenet.client.FailureCodeTracker;
-import freenet.client.InserterContext;
-import freenet.client.InserterException;
-import freenet.keys.CHKEncodeException;
-import freenet.keys.ClientCHKBlock;
-import freenet.keys.ClientKey;
-import freenet.keys.ClientKeyBlock;
-import freenet.keys.FreenetURI;
-import freenet.keys.InsertableClientSSK;
-import freenet.keys.SSKEncodeException;
-import freenet.node.LowLevelPutException;
-import freenet.node.Node;
-import freenet.support.Bucket;
-import freenet.support.Logger;
-
-/**
- * Insert *ONE KEY*.
- */
-public class SingleBlockInserter implements SendableInsert, ClientPutState {
-
- final Bucket sourceData;
- final short compressionCodec;
- final FreenetURI uri; // uses essentially no RAM in the common case of
a CHK because we use FreenetURI.EMPTY_CHK_URI
- FreenetURI resultingURI;
- final PutCompletionCallback cb;
- final BaseClientPutter parent;
- final InserterContext ctx;
- private int retries;
- private final FailureCodeTracker errors;
- private boolean finished;
- private ClientKey key;
- private SoftReference refToClientKeyBlock;
- final int token; // for e.g. splitfiles
- final boolean isMetadata;
- final int sourceLength;
-
- public SingleBlockInserter(BaseClientPutter parent, Bucket data, short
compressionCodec, FreenetURI uri, InserterContext ctx, PutCompletionCallback
cb, boolean isMetadata, int sourceLength, int token, boolean getCHKOnly) throws
InserterException {
- this.token = token;
- this.parent = parent;
- this.retries = 0;
- this.finished = false;
- this.ctx = ctx;
- errors = new FailureCodeTracker(true);
- this.cb = cb;
- this.uri = uri;
- this.compressionCodec = compressionCodec;
- this.sourceData = data;
- this.isMetadata = isMetadata;
- this.sourceLength = sourceLength;
- if(getCHKOnly) {
- ClientKeyBlock block = encode();
- cb.onEncode(block.getClientKey(), this);
- cb.onSuccess(this);
- finished = true;
- }
- }
-
- protected ClientKeyBlock innerEncode() throws InserterException {
- String uriType = uri.getKeyType().toUpperCase();
- if(uriType.equals("CHK")) {
- try {
- return ClientCHKBlock.encode(sourceData,
isMetadata, compressionCodec == -1, compressionCodec, sourceLength);
- } catch (CHKEncodeException e) {
- Logger.error(this, "Caught "+e, e);
- throw new
InserterException(InserterException.INTERNAL_ERROR, e, null);
- } catch (IOException e) {
- Logger.error(this, "Caught "+e, e);
- throw new
InserterException(InserterException.BUCKET_ERROR, e, null);
- }
- } else if(uriType.equals("SSK") || uriType.equals("KSK")) {
- try {
- InsertableClientSSK ik =
InsertableClientSSK.create(uri);
- return ik.encode(sourceData, isMetadata,
compressionCodec == -1, compressionCodec, sourceLength, ctx.random);
- } catch (MalformedURLException e) {
- throw new
InserterException(InserterException.INVALID_URI, e, null);
- } catch (SSKEncodeException e) {
- Logger.error(this, "Caught "+e, e);
- throw new
InserterException(InserterException.INTERNAL_ERROR, e, null);
- } catch (IOException e) {
- Logger.error(this, "Caught "+e, e);
- throw new
InserterException(InserterException.BUCKET_ERROR, e, null);
- }
- } else {
- throw new
InserterException(InserterException.INVALID_URI, "Unknown keytype "+uriType,
null);
- }
- }
-
- protected synchronized ClientKeyBlock encode() throws InserterException
{
- if(refToClientKeyBlock != null) {
- ClientKeyBlock block = (ClientKeyBlock)
refToClientKeyBlock.get();
- if(block != null) return block;
- }
- ClientKeyBlock block = innerEncode();
- refToClientKeyBlock =
- new SoftReference(block);
- resultingURI = block.getClientKey().getURI();
- cb.onEncode(block.getClientKey(), this);
- return block;
- }
-
- public boolean isInsert() {
- return true;
- }
-
- public short getPriorityClass() {
- return parent.getPriorityClass();
- }
-
- public int getRetryCount() {
- return retries;
- }
-
- public void onFailure(LowLevelPutException e) {
- if(parent.isCancelled())
- fail(new
InserterException(InserterException.CANCELLED));
- if(e.code == LowLevelPutException.COLLISION)
- fail(new
InserterException(InserterException.COLLISION));
- switch(e.code) {
- case LowLevelPutException.INTERNAL_ERROR:
- errors.inc(InserterException.INTERNAL_ERROR);
- break;
- case LowLevelPutException.REJECTED_OVERLOAD:
- errors.inc(InserterException.REJECTED_OVERLOAD);
- break;
- case LowLevelPutException.ROUTE_NOT_FOUND:
- errors.inc(InserterException.ROUTE_NOT_FOUND);
- break;
- case LowLevelPutException.ROUTE_REALLY_NOT_FOUND:
- errors.inc(InserterException.ROUTE_REALLY_NOT_FOUND);
- break;
- default:
- Logger.error(this, "Unknown LowLevelPutException code:
"+e.code);
- errors.inc(InserterException.INTERNAL_ERROR);
- }
- Logger.minor(this, "Failed: "+e);
- if(retries > ctx.maxInsertRetries) {
- if(errors.isOneCodeOnly())
- fail(new
InserterException(errors.getFirstCode()));
- else
- fail(new
InserterException(InserterException.TOO_MANY_RETRIES_IN_BLOCKS, errors,
getURI()));
- }
- retries++;
- parent.scheduler.register(this);
- }
-
- private synchronized void fail(InserterException e) {
- if(finished) return;
- finished = true;
- cb.onFailure(e, this);
- }
-
- public ClientKeyBlock getBlock() {
- try {
- if(finished) return null;
- return encode();
- } catch (InserterException e) {
- cb.onFailure(e, this);
- return null;
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t, t);
- cb.onFailure(new
InserterException(InserterException.INTERNAL_ERROR, t, null), this);
- return null;
- }
- }
-
- public void schedule() {
- if(finished) return;
- parent.scheduler.register(this);
- }
-
- public FreenetURI getURI() {
- if(resultingURI == null)
- getBlock();
- return resultingURI;
- }
-
- public void onSuccess() {
- Logger.minor(this, "Succeeded ("+this+"): "+token);
- synchronized(this) {
- finished = true;
- }
- cb.onSuccess(this);
- }
-
- public BaseClientPutter getParent() {
- return parent;
- }
-
- public void cancel() {
- synchronized(this) {
- if(finished) return;
- finished = true;
- }
- cb.onFailure(new
InserterException(InserterException.CANCELLED), this);
- }
-
- public boolean isFinished() {
- return finished;
- }
-
- public void send(Node node) {
- try {
- Logger.minor(this, "Starting request: "+this);
- node.realPut(getBlock(), true);
- } catch (LowLevelPutException e) {
- onFailure(e);
- Logger.minor(this, "Request failed: "+this+" for "+e);
- return;
- }
- Logger.minor(this, "Request succeeded: "+this);
- onSuccess();
- }
-
-}
Copied: trunk/freenet/src/freenet/client/async/SingleBlockInserter.java (from
rev 7929,
branches/async-client/src/freenet/client/async/SingleBlockInserter.java)
Deleted: trunk/freenet/src/freenet/client/async/SingleFileFetcher.java
===================================================================
--- branches/async-client/src/freenet/client/async/SingleFileFetcher.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/async/SingleFileFetcher.java
2006-01-25 23:47:20 UTC (rev 7930)
@@ -1,493 +0,0 @@
-package freenet.client.async;
-
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.util.LinkedList;
-
-import freenet.client.ArchiveContext;
-import freenet.client.ArchiveFailureException;
-import freenet.client.ArchiveRestartException;
-import freenet.client.ArchiveStoreContext;
-import freenet.client.ClientMetadata;
-import freenet.client.FetchException;
-import freenet.client.FetchResult;
-import freenet.client.FetcherContext;
-import freenet.client.Metadata;
-import freenet.client.MetadataParseException;
-import freenet.keys.ClientKey;
-import freenet.keys.ClientKeyBlock;
-import freenet.keys.FreenetURI;
-import freenet.keys.KeyDecodeException;
-import freenet.node.LowLevelGetException;
-import freenet.node.LowLevelPutException;
-import freenet.node.Node;
-import freenet.support.Bucket;
-import freenet.support.Logger;
-import freenet.support.compress.CompressionOutputSizeException;
-import freenet.support.compress.Compressor;
-
-public class SingleFileFetcher extends ClientGetState implements SendableGet {
-
- final ClientGetter parent;
- //final FreenetURI uri;
- final ClientKey key;
- final LinkedList metaStrings;
- final FetcherContext ctx;
- final GetCompletionCallback rcb;
- final ClientMetadata clientMetadata;
- private Metadata metadata;
- final int maxRetries;
- final ArchiveContext actx;
- /** Archive handler. We can only have one archive handler at a time. */
- private ArchiveStoreContext ah;
- private int recursionLevel;
- /** The URI of the currently-being-processed data, for archives etc. */
- private FreenetURI thisKey;
- private int retryCount;
- private final LinkedList decompressors;
- private final boolean dontTellClientGet;
- private boolean cancelled;
- private Object token;
-
-
- /** Create a new SingleFileFetcher and register self.
- * Called when following a redirect, or direct from ClientGet.
- * @param token
- * @param dontTellClientGet
- */
- public SingleFileFetcher(ClientGetter get, GetCompletionCallback cb,
ClientMetadata metadata, ClientKey key, LinkedList metaStrings, FetcherContext
ctx, ArchiveContext actx, int maxRetries, int recursionLevel, boolean
dontTellClientGet, Object token) throws FetchException {
- Logger.minor(this, "Creating SingleFileFetcher for "+key);
- this.cancelled = false;
- this.dontTellClientGet = dontTellClientGet;
- this.token = token;
- this.parent = get;
- //this.uri = uri;
- //this.key = ClientKey.getBaseKey(uri);
- //metaStrings = uri.listMetaStrings();
- this.key = key;
- this.metaStrings = metaStrings;
- this.ctx = ctx;
- retryCount = 0;
- this.rcb = cb;
- this.clientMetadata = metadata;
- this.maxRetries = maxRetries;
- thisKey = key.getURI();
- this.actx = actx;
- this.recursionLevel = recursionLevel + 1;
- if(recursionLevel > ctx.maxRecursionLevel)
- throw new
FetchException(FetchException.TOO_MUCH_RECURSION);
- this.decompressors = new LinkedList();
- }
-
- /** Called by ClientGet. */
- public SingleFileFetcher(ClientGetter get, GetCompletionCallback cb,
ClientMetadata metadata, FreenetURI uri, FetcherContext ctx, ArchiveContext
actx, int maxRetries, int recursionLevel, boolean dontTellClientGet, Object
token) throws MalformedURLException, FetchException {
- this(get, cb, metadata, ClientKey.getBaseKey(uri),
uri.listMetaStrings(), ctx, actx, maxRetries, recursionLevel,
dontTellClientGet, token);
- }
-
- /** Copy constructor, modifies a few given fields, don't call
schedule() */
- public SingleFileFetcher(SingleFileFetcher fetcher, Metadata newMeta,
GetCompletionCallback callback, FetcherContext ctx2) throws FetchException {
- Logger.minor(this, "Creating SingleFileFetcher for
"+fetcher.key);
- this.token = fetcher.token;
- this.dontTellClientGet = fetcher.dontTellClientGet;
- this.actx = fetcher.actx;
- this.ah = fetcher.ah;
- this.clientMetadata = fetcher.clientMetadata;
- this.ctx = ctx2;
- this.key = fetcher.key;
- this.maxRetries = fetcher.maxRetries;
- this.metadata = newMeta;
- this.metaStrings = fetcher.metaStrings;
- this.parent = fetcher.parent;
- this.rcb = callback;
- this.retryCount = 0;
- this.recursionLevel = fetcher.recursionLevel + 1;
- if(recursionLevel > ctx.maxRecursionLevel)
- throw new
FetchException(FetchException.TOO_MUCH_RECURSION);
- this.thisKey = fetcher.thisKey;
- this.decompressors = fetcher.decompressors;
- }
-
- public void schedule() {
- if(!dontTellClientGet)
- this.parent.currentState = this;
- parent.scheduler.register(this);
- }
-
- public ClientGetter getParent() {
- return parent;
- }
-
- public ClientKey getKey() {
- return key;
- }
-
- public short getPriorityClass() {
- return parent.getPriorityClass();
- }
-
- public int getRetryCount() {
- return retryCount;
- }
-
- // Process the completed data. May result in us going to a
- // splitfile, or another SingleFileFetcher, etc.
- public void onSuccess(ClientKeyBlock block) {
- // Extract data
- Bucket data;
- try {
- data = block.decode(ctx.bucketFactory,
(int)(Math.min(ctx.maxOutputLength, Integer.MAX_VALUE)));
- } catch (KeyDecodeException e1) {
- onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR, e1.getMessage()));
- return;
- } catch (IOException e) {
- Logger.error(this, "Could not capture data - disk
full?: "+e, e);
- onFailure(new
FetchException(FetchException.BUCKET_ERROR, e));
- return;
- }
- if(!block.isMetadata()) {
- onSuccess(new FetchResult(clientMetadata, data));
- } else {
- if(!ctx.followRedirects) {
- onFailure(new
FetchException(FetchException.INVALID_METADATA, "Told me not to follow
redirects (splitfile block??)"));
- return;
- }
- if(parent.isCancelled()) {
- onFailure(new
FetchException(FetchException.CANCELLED));
- return;
- }
- if(data.size() > ctx.maxMetadataSize) {
- onFailure(new
FetchException(FetchException.TOO_BIG_METADATA));
- return;
- }
- // Parse metadata
- try {
- metadata = Metadata.construct(data);
- } catch (MetadataParseException e) {
- onFailure(new FetchException(e));
- return;
- } catch (IOException e) {
- // Bucket error?
- onFailure(new
FetchException(FetchException.BUCKET_ERROR, e));
- return;
- }
- try {
- handleMetadata();
- } catch (MetadataParseException e) {
- onFailure(new FetchException(e));
- return;
- } catch (FetchException e) {
- onFailure(e);
- return;
- } catch (ArchiveFailureException e) {
- onFailure(new FetchException(e));
- } catch (ArchiveRestartException e) {
- onFailure(new FetchException(e));
- }
- }
- }
-
- private void onSuccess(FetchResult result) {
- if(!decompressors.isEmpty()) {
- Bucket data = result.asBucket();
- while(!decompressors.isEmpty()) {
- Compressor c = (Compressor)
decompressors.removeLast();
- try {
- data = c.decompress(data,
ctx.bucketFactory, Math.max(ctx.maxTempLength, ctx.maxOutputLength));
- } catch (IOException e) {
- onFailure(new
FetchException(FetchException.BUCKET_ERROR, e));
- return;
- } catch (CompressionOutputSizeException e) {
- onFailure(new
FetchException(FetchException.TOO_BIG, e));
- return;
- }
- }
- result = new FetchResult(result, data);
- }
- rcb.onSuccess(result, this);
- }
-
- private void handleMetadata() throws FetchException,
MetadataParseException, ArchiveFailureException, ArchiveRestartException {
- while(true) {
- if(metadata.isSimpleManifest()) {
- String name;
- if(metaStrings.isEmpty())
- name = null;
- else
- name = (String)
metaStrings.removeFirst();
- // Since metadata is a document, we just
replace metadata here
- if(name == null) {
- metadata =
metadata.getDefaultDocument();
- if(metadata == null)
- throw new
FetchException(FetchException.NOT_ENOUGH_METASTRINGS);
- } else {
- metadata = metadata.getDocument(name);
- thisKey = thisKey.pushMetaString(name);
- if(metadata == null)
- throw new
FetchException(FetchException.NOT_IN_ARCHIVE);
- }
- continue; // loop
- } else if(metadata.isArchiveManifest()) {
- if(metaStrings.isEmpty() &&
ctx.returnZIPManifests) {
- // Just return the archive, whole.
- metadata.setSimpleRedirect();
- continue;
- }
- // First we need the archive metadata.
- // Then parse it.
- // Then we may need to fetch something from
inside the archive.
- ah = (ArchiveStoreContext)
ctx.archiveManager.makeHandler(thisKey, metadata.getArchiveType(), false);
- // ah is set. This means we are currently
handling an archive.
- Bucket metadataBucket;
- metadataBucket = ah.getMetadata(actx, null,
recursionLevel+1, true);
- if(metadataBucket != null) {
- try {
- metadata =
Metadata.construct(metadataBucket);
- } catch (IOException e) {
- // Bucket error?
- throw new
FetchException(FetchException.BUCKET_ERROR, e);
- }
- } else {
- fetchArchive(); // will result in this
function being called again
- return;
- }
- continue;
- } else if(metadata.isArchiveInternalRedirect()) {
-
clientMetadata.mergeNoOverwrite(metadata.getClientMetadata()); // even
splitfiles can have mime types!
- // Fetch it from the archive
- if(ah == null)
- throw new
FetchException(FetchException.UNKNOWN_METADATA, "Archive redirect not in an
archive");
- if(metaStrings.isEmpty())
- throw new
FetchException(FetchException.NOT_ENOUGH_METASTRINGS);
- Bucket dataBucket = ah.get((String)
metaStrings.removeFirst(), actx, null, recursionLevel+1, true);
- if(dataBucket != null) {
- // Return the data
- onSuccess(new
FetchResult(this.clientMetadata, dataBucket));
- return;
- } else {
- // Metadata cannot contain pointers to
files which don't exist.
- // We enforce this in ArchiveHandler.
- // Therefore, the archive needs to be
fetched.
- fetchArchive();
- // Will call back into this function
when it has been fetched.
- return;
- }
- } else if(metadata.isMultiLevelMetadata()) {
- // Fetch on a second SingleFileFetcher, like
with archives.
- Metadata newMeta = (Metadata) metadata.clone();
- metadata.setSimpleRedirect();
- SingleFileFetcher f = new
SingleFileFetcher(this, newMeta, new MultiLevelMetadataCallback(), ctx);
- f.handleMetadata();
- return;
- } else if(metadata.isSingleFileRedirect()) {
-
clientMetadata.mergeNoOverwrite(metadata.getClientMetadata()); // even
splitfiles can have mime types!
- // FIXME implement implicit archive support
-
- // Simple redirect
- // Just create a new SingleFileFetcher
- // Which will then fetch the target URI, and
call the rcd.success
- // Hopefully!
- FreenetURI uri = metadata.getSingleTarget();
- Logger.minor(this, "Redirecting to "+uri);
- ClientKey key;
- try {
- key = ClientKey.getBaseKey(uri);
- } catch (MalformedURLException e) {
- throw new
FetchException(FetchException.INVALID_URI, e);
- }
- LinkedList newMetaStrings =
uri.listMetaStrings();
-
- // Move any new meta strings to beginning of
our list of remaining meta strings
- while(!newMetaStrings.isEmpty()) {
- Object o = newMetaStrings.removeLast();
- metaStrings.addFirst(o);
- }
-
- SingleFileFetcher f = new
SingleFileFetcher(parent, rcb, clientMetadata, key, metaStrings, ctx, actx,
maxRetries, recursionLevel, false, null);
- if(metadata.isCompressed()) {
- Compressor codec =
Compressor.getCompressionAlgorithmByMetadataID(metadata.getCompressionCodec());
- f.addDecompressor(codec);
- }
- f.schedule();
- // All done! No longer our problem!
- return;
- } else if(metadata.isSplitfile()) {
- Logger.minor(this, "Fetching splitfile");
- // FIXME implicit archive support
-
-
clientMetadata.mergeNoOverwrite(metadata.getClientMetadata()); // even
splitfiles can have mime types!
-
- // Splitfile (possibly compressed)
-
- if(metadata.isCompressed()) {
- Compressor codec =
Compressor.getCompressionAlgorithmByMetadataID(metadata.getCompressionCodec());
- addDecompressor(codec);
- }
-
- SplitFileFetcher sf = new
SplitFileFetcher(metadata, rcb, parent, ctx,
- decompressors, clientMetadata,
actx, recursionLevel);
- sf.schedule();
- // SplitFile will now run.
- // Then it will return data to rcd.
- // We are now out of the loop. Yay!
- return;
- } else {
- Logger.error(this, "Don't know what to do with
metadata: "+metadata);
- throw new
FetchException(FetchException.UNKNOWN_METADATA);
- }
- }
- }
-
- private void addDecompressor(Compressor codec) {
- decompressors.addLast(codec);
- }
-
- private void fetchArchive() throws FetchException,
MetadataParseException, ArchiveFailureException, ArchiveRestartException {
- // Fetch the archive
- // How?
- // Spawn a separate SingleFileFetcher,
- // which fetches the archive, then calls
- // our Callback, which unpacks the archive, then
- // reschedules us.
- Metadata newMeta = (Metadata) metadata.clone();
- newMeta.setSimpleRedirect();
- SingleFileFetcher f;
- f = new SingleFileFetcher(this, newMeta, new
ArchiveFetcherCallback(), new FetcherContext(ctx,
FetcherContext.SET_RETURN_ARCHIVES));
- f.handleMetadata();
- // When it is done (if successful), the ArchiveCallback will
re-call this function.
- // Which will then discover that the metadata *is* available.
- // And will also discover that the data is available, and will
complete.
- }
-
- class ArchiveFetcherCallback implements GetCompletionCallback {
-
- public void onSuccess(FetchResult result, ClientGetState state)
{
- parent.currentState = SingleFileFetcher.this;
- try {
- ctx.archiveManager.extractToCache(thisKey,
ah.getArchiveType(), result.asBucket(), actx, ah);
- } catch (ArchiveFailureException e) {
- SingleFileFetcher.this.onFailure(new
FetchException(e));
- } catch (ArchiveRestartException e) {
- SingleFileFetcher.this.onFailure(new
FetchException(e));
- }
- try {
- handleMetadata();
- } catch (MetadataParseException e) {
- SingleFileFetcher.this.onFailure(new
FetchException(e));
- } catch (FetchException e) {
- SingleFileFetcher.this.onFailure(e);
- } catch (ArchiveFailureException e) {
- SingleFileFetcher.this.onFailure(new
FetchException(e));
- } catch (ArchiveRestartException e) {
- SingleFileFetcher.this.onFailure(new
FetchException(e));
- }
- }
-
- public void onFailure(FetchException e, ClientGetState state) {
- // Force fatal as the fetcher is presumed to have made
a reasonable effort.
- SingleFileFetcher.this.onFailure(e, true);
- }
-
- }
-
- class MultiLevelMetadataCallback implements GetCompletionCallback {
-
- public void onSuccess(FetchResult result, ClientGetState state)
{
- parent.currentState = SingleFileFetcher.this;
- try {
- metadata =
Metadata.construct(result.asBucket());
- } catch (MetadataParseException e) {
- SingleFileFetcher.this.onFailure(new
FetchException(e));
- return;
- } catch (IOException e) {
- // Bucket error?
- SingleFileFetcher.this.onFailure(new
FetchException(FetchException.BUCKET_ERROR, e));
- return;
- }
- }
-
- public void onFailure(FetchException e, ClientGetState state) {
- // Pass it on; fetcher is assumed to have retried as
appropriate already, so this is fatal.
- SingleFileFetcher.this.onFailure(e, true);
- }
-
- }
-
- private final void onFailure(FetchException e) {
- onFailure(e, false);
- }
-
- // Real onFailure
- private void onFailure(FetchException e, boolean forceFatal) {
- if(parent.isCancelled() || cancelled) {
- onFailure(new FetchException(FetchException.CANCELLED));
- return;
- }
- if(!(e.isFatal() || forceFatal) ) {
- if(retryCount <= maxRetries) {
- if(parent.isCancelled()) {
- onFailure(new
FetchException(FetchException.CANCELLED));
- return;
- }
- retryCount++;
- parent.scheduler.register(this);
- return;
- }
- }
- // :(
- rcb.onFailure(e, this);
- }
-
- // Translate it, then call the real onFailure
- public void onFailure(LowLevelGetException e) {
- switch(e.code) {
- case LowLevelGetException.DATA_NOT_FOUND:
- onFailure(new
FetchException(FetchException.DATA_NOT_FOUND));
- case LowLevelGetException.DATA_NOT_FOUND_IN_STORE:
- onFailure(new
FetchException(FetchException.DATA_NOT_FOUND));
- case LowLevelGetException.DECODE_FAILED:
- onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR));
- case LowLevelGetException.INTERNAL_ERROR:
- onFailure(new
FetchException(FetchException.INTERNAL_ERROR));
- case LowLevelGetException.REJECTED_OVERLOAD:
- onFailure(new
FetchException(FetchException.REJECTED_OVERLOAD));
- case LowLevelGetException.ROUTE_NOT_FOUND:
- onFailure(new
FetchException(FetchException.ROUTE_NOT_FOUND));
- case LowLevelGetException.TRANSFER_FAILED:
- onFailure(new
FetchException(FetchException.TRANSFER_FAILED));
- case LowLevelGetException.VERIFY_FAILED:
- onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR));
- default:
- Logger.error(this, "Unknown LowLevelGetException code:
"+e.code);
- onFailure(new
FetchException(FetchException.INTERNAL_ERROR));
- }
- }
-
- public Object getToken() {
- return token;
- }
-
- public void cancel() {
- cancelled = true;
- }
-
- public boolean isFinished() {
- return cancelled;
- }
-
- /** Do the request, blocking. Called by RequestStarter. */
- public void send(Node node) {
- // Do we need to support the last 3?
- ClientKeyBlock block;
- try {
- block = node.realGetKey(key, false, false, false);
- } catch (LowLevelGetException e) {
- onFailure(e);
- return;
- } catch (Throwable t) {
- onFailure(new
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR));
- return;
- }
- onSuccess(block);
- }
-
-}
Copied: trunk/freenet/src/freenet/client/async/SingleFileFetcher.java (from rev
7929, branches/async-client/src/freenet/client/async/SingleFileFetcher.java)
Deleted: trunk/freenet/src/freenet/client/async/SingleFileInserter.java
===================================================================
--- branches/async-client/src/freenet/client/async/SingleFileInserter.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/async/SingleFileInserter.java
2006-01-25 23:47:20 UTC (rev 7930)
@@ -1,332 +0,0 @@
-package freenet.client.async;
-
-import java.io.IOException;
-
-import freenet.client.InsertBlock;
-import freenet.client.InserterContext;
-import freenet.client.InserterException;
-import freenet.client.Metadata;
-import freenet.keys.CHKBlock;
-import freenet.keys.ClientCHKBlock;
-import freenet.keys.ClientKey;
-import freenet.keys.FreenetURI;
-import freenet.keys.SSKBlock;
-import freenet.support.Bucket;
-import freenet.support.BucketTools;
-import freenet.support.Logger;
-import freenet.support.compress.CompressionOutputSizeException;
-import freenet.support.compress.Compressor;
-
-/**
- * Attempt to insert a file. May include metadata.
- *
- * This stage:
- * Attempt to compress the file. Off-thread if it will take a while.
- * Then hand it off to SimpleFileInserter.
- */
-class SingleFileInserter implements ClientPutState {
-
- // Config option???
- private static final long COMPRESS_OFF_THREAD_LIMIT = 65536;
-
- final BaseClientPutter parent;
- final InsertBlock block;
- final InserterContext ctx;
- final boolean metadata;
- final PutCompletionCallback cb;
- final boolean getCHKOnly;
- /** If true, we are not the top level request, and should not
- * update our parent to point to us as current put-stage. */
- final boolean dontTellParent;
- private boolean cancelled = false;
- private boolean reportMetadataOnly;
-
- /**
- * @param parent
- * @param cb
- * @param block
- * @param metadata
- * @param ctx
- * @param dontCompress
- * @param dontTellParent
- * @param getCHKOnly
- * @param reportMetadataOnly If true, don't insert the metadata, just
report it.
- * @throws InserterException
- */
- SingleFileInserter(BaseClientPutter parent, PutCompletionCallback cb,
InsertBlock block,
- boolean metadata, InserterContext ctx, boolean
dontCompress,
- boolean dontTellParent, boolean getCHKOnly, boolean
reportMetadataOnly) throws InserterException {
- this.reportMetadataOnly = reportMetadataOnly;
- this.parent = parent;
- this.block = block;
- this.ctx = ctx;
- this.metadata = metadata;
- this.cb = cb;
- this.dontTellParent = dontTellParent;
- this.getCHKOnly = getCHKOnly;
- }
-
- public void start() throws InserterException {
- if((!ctx.dontCompress) && block.getData().size() >
COMPRESS_OFF_THREAD_LIMIT) {
- // Run off thread
- OffThreadCompressor otc = new OffThreadCompressor();
- Thread t = new Thread(otc, "Compressor for "+this);
- t.setDaemon(true);
- t.start();
- } else {
- tryCompress();
- }
- }
-
- private class OffThreadCompressor implements Runnable {
- public void run() {
- try {
- tryCompress();
- } catch (InserterException e) {
- cb.onFailure(e, SingleFileInserter.this);
- }
- }
- }
-
- private void tryCompress() throws InserterException {
- // First, determine how small it needs to be
- Bucket origData = block.getData();
- Bucket data = origData;
- int blockSize;
- boolean dontCompress = ctx.dontCompress;
-
- long origSize = data.size();
- String type = block.desiredURI.getKeyType().toUpperCase();
- if(type.equals("SSK") || type.equals("KSK")) {
- blockSize = SSKBlock.DATA_LENGTH;
- } else if(type.equals("CHK")) {
- blockSize = CHKBlock.DATA_LENGTH;
- } else {
- throw new
InserterException(InserterException.INVALID_URI);
- }
-
- Compressor bestCodec = null;
- Bucket bestCompressedData = null;
-
- if(origSize > blockSize && (!ctx.dontCompress) &&
(!dontCompress)) {
- // Try to compress the data.
- // Try each algorithm, starting with the fastest and
weakest.
- // Stop when run out of algorithms, or the compressed
data fits in a single block.
- int algos = Compressor.countCompressAlgorithms();
- try {
- for(int i=0;i<algos;i++) {
- Compressor comp =
Compressor.getCompressionAlgorithmByDifficulty(i);
- Bucket result;
- result = comp.compress(origData,
ctx.bf, Long.MAX_VALUE);
- if(result.size() < blockSize) {
- bestCodec = comp;
- data = result;
- if(bestCompressedData != null)
-
ctx.bf.freeBucket(bestCompressedData);
- bestCompressedData = data;
- break;
- }
- if(bestCompressedData != null &&
result.size() < bestCompressedData.size()) {
-
ctx.bf.freeBucket(bestCompressedData);
- bestCompressedData = result;
- data = result;
- bestCodec = comp;
- } else if(bestCompressedData == null &&
result.size() < data.size()) {
- bestCompressedData = result;
- bestCodec = comp;
- data = result;
- }
- }
- } catch (IOException e) {
- throw new
InserterException(InserterException.BUCKET_ERROR, e, null);
- } catch (CompressionOutputSizeException e) {
- // Impossible
- throw new Error(e);
- }
- }
-
- // Compressed data
-
- // Insert it...
- short codecNumber = bestCodec == null ? -1 :
bestCodec.codecNumberForMetadata();
-
- if(block.getData().size() > Integer.MAX_VALUE)
- throw new
InserterException(InserterException.INTERNAL_ERROR, "2GB+ should not encode to
one block!", null);
-
- if((block.clientMetadata == null ||
block.clientMetadata.isTrivial())) {
- if(data.size() < blockSize) {
- // Just insert it
- SingleBlockInserter bi = new
SingleBlockInserter(parent, data, codecNumber, block.desiredURI, ctx, cb,
metadata, (int)block.getData().size(), -1, getCHKOnly);
- bi.schedule();
- cb.onTransition(this, bi);
- return;
- }
- }
- if (data.size() < ClientCHKBlock.MAX_COMPRESSED_DATA_LENGTH) {
- // Insert single block, then insert pointer to it
- if(reportMetadataOnly) {
- SingleBlockInserter dataPutter = new
SingleBlockInserter(parent, data, codecNumber, FreenetURI.EMPTY_CHK_URI, ctx,
cb, metadata, (int)origSize, -1, getCHKOnly);
- Metadata meta = new
Metadata(Metadata.SIMPLE_REDIRECT, dataPutter.getURI(), block.clientMetadata);
- cb.onMetadata(meta, this);
- cb.onTransition(this, dataPutter);
- dataPutter.schedule();
- } else {
- MultiPutCompletionCallback mcb =
- new MultiPutCompletionCallback(cb,
parent);
- SingleBlockInserter dataPutter = new
SingleBlockInserter(parent, data, codecNumber, FreenetURI.EMPTY_CHK_URI, ctx,
mcb, metadata, (int)origSize, -1, getCHKOnly);
- Metadata meta = new
Metadata(Metadata.SIMPLE_REDIRECT, dataPutter.getURI(), block.clientMetadata);
- Bucket metadataBucket;
- try {
- metadataBucket =
BucketTools.makeImmutableBucket(ctx.bf, meta.writeToByteArray());
- } catch (IOException e) {
- throw new
InserterException(InserterException.BUCKET_ERROR, e, null);
- }
- SingleBlockInserter metaPutter = new
SingleBlockInserter(parent, metadataBucket, (short) -1, block.desiredURI, ctx,
mcb, true, (int)origSize, -1, getCHKOnly);
- mcb.addURIGenerator(metaPutter);
- mcb.add(dataPutter);
- cb.onTransition(this, mcb);
- mcb.arm();
- dataPutter.schedule();
- metaPutter.schedule();
- }
- return;
- }
- // Otherwise the file is too big to fit into one block
- // We therefore must make a splitfile
- // Job of SplitHandler: when the splitinserter has the metadata,
- // insert it. Then when the splitinserter has finished, and the
- // metadata insert has finished too, tell the master callback.
- if(reportMetadataOnly) {
- SplitFileInserter sfi = new SplitFileInserter(parent,
cb, data, bestCodec, block.clientMetadata, ctx, getCHKOnly, metadata);
- cb.onTransition(this, sfi);
- sfi.start();
- } else {
- SplitHandler sh = new SplitHandler();
- SplitFileInserter sfi = new SplitFileInserter(parent,
sh, data, bestCodec, block.clientMetadata, ctx, getCHKOnly, metadata);
- sh.sfi = sfi;
- cb.onTransition(this, sh);
- sfi.start();
- }
- return;
- }
-
- /**
- * When we get the metadata, start inserting it to our target key.
- * When we have inserted both the metadata and the splitfile,
- * call the master callback.
- */
- class SplitHandler implements PutCompletionCallback, ClientPutState {
-
- ClientPutState sfi;
- ClientPutState metadataPutter;
- boolean finished = false;
- boolean splitInsertSuccess = false;
- boolean metaInsertSuccess = false;
-
- public synchronized void onTransition(ClientPutState oldState,
ClientPutState newState) {
- if(oldState == sfi)
- sfi = newState;
- if(oldState == metadataPutter)
- metadataPutter = newState;
- }
-
- public void onSuccess(ClientPutState state) {
- Logger.minor(this, "onSuccess("+state+")");
- synchronized(this) {
- if(finished) return;
- if(state == sfi) {
- Logger.minor(this, "Splitfile insert
succeeded");
- splitInsertSuccess = true;
- } else if(state == metadataPutter) {
- Logger.minor(this, "Metadata insert
succeeded");
- metaInsertSuccess = true;
- } else {
- Logger.error(this, "Unknown: "+state);
- }
- if(splitInsertSuccess && metaInsertSuccess) {
- Logger.minor(this, "Both succeeded");
- finished = true;
- }
- else return;
- }
- cb.onSuccess(this);
- }
-
- public synchronized void onFailure(InserterException e,
ClientPutState state) {
- if(finished) return;
- fail(e);
- }
-
- public void onMetadata(Metadata meta, ClientPutState state) {
- if(finished) return;
- if(state == metadataPutter) {
- Logger.error(this, "Got metadata for metadata");
- // FIXME kill?
- } else if(state != sfi) {
- Logger.error(this, "Got unknown metadata");
- // FIXME kill?
- }
- if(reportMetadataOnly) {
- cb.onMetadata(meta, this);
- metaInsertSuccess = true;
- } else {
- synchronized(this) {
- Bucket metadataBucket;
- try {
- metadataBucket =
BucketTools.makeImmutableBucket(ctx.bf, meta.writeToByteArray());
- } catch (IOException e) {
- InserterException ex = new
InserterException(InserterException.BUCKET_ERROR, e, null);
- fail(ex);
- return;
- }
- InsertBlock newBlock = new
InsertBlock(metadataBucket, null, block.desiredURI);
- try {
- metadataPutter = new
SingleFileInserter(parent, this, newBlock, true, ctx, false, getCHKOnly, false,
false);
- Logger.minor(this, "Putting
metadata on "+metadataPutter);
- } catch (InserterException e) {
- cb.onFailure(e, this);
- return;
- }
- }
- try {
-
((SingleFileInserter)metadataPutter).start();
- } catch (InserterException e) {
- fail(e);
- return;
- }
- }
- }
-
- private synchronized void fail(InserterException e) {
- Logger.minor(this, "Failing: "+e, e);
- if(finished) return;
- finished = true;
- cb.onFailure(e, this);
- }
-
- public BaseClientPutter getParent() {
- return parent;
- }
-
- public void onEncode(ClientKey key, ClientPutState state) {
- if(state == metadataPutter)
- cb.onEncode(key, this);
- }
-
- public void cancel() {
- if(sfi != null)
- sfi.cancel();
- if(metadataPutter != null)
- metadataPutter.cancel();
- }
-
- }
-
- public BaseClientPutter getParent() {
- return parent;
- }
-
- public void cancel() {
- cancelled = true;
- }
-}
Copied: trunk/freenet/src/freenet/client/async/SingleFileInserter.java (from
rev 7929,
branches/async-client/src/freenet/client/async/SingleFileInserter.java)
Deleted: trunk/freenet/src/freenet/client/async/SplitFileFetcher.java
===================================================================
--- branches/async-client/src/freenet/client/async/SplitFileFetcher.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/async/SplitFileFetcher.java
2006-01-25 23:47:20 UTC (rev 7930)
@@ -1,249 +0,0 @@
-package freenet.client.async;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.LinkedList;
-
-import freenet.client.ArchiveContext;
-import freenet.client.ClientMetadata;
-import freenet.client.FetchException;
-import freenet.client.FetchResult;
-import freenet.client.FetcherContext;
-import freenet.client.Metadata;
-import freenet.client.MetadataParseException;
-import freenet.client.events.SplitfileProgressEvent;
-import freenet.keys.FreenetURI;
-import freenet.keys.NodeCHK;
-import freenet.support.Bucket;
-import freenet.support.Fields;
-import freenet.support.Logger;
-import freenet.support.compress.CompressionOutputSizeException;
-import freenet.support.compress.Compressor;
-
-/**
- * Fetch a splitfile, decompress it if need be, and return it to the
GetCompletionCallback.
- * Most of the work is done by the segments, and we do not need a thread.
- */
-public class SplitFileFetcher extends ClientGetState {
-
- final FetcherContext fetchContext;
- final ArchiveContext archiveContext;
- final LinkedList decompressors;
- final ClientMetadata clientMetadata;
- final ClientGetter parent;
- final GetCompletionCallback cb;
- final int recursionLevel;
- /** The splitfile type. See the SPLITFILE_ constants on Metadata. */
- final short splitfileType;
- /** The segment length. -1 means not segmented and must get everything
to decode. */
- final int blocksPerSegment;
- /** The segment length in check blocks. */
- final int checkBlocksPerSegment;
- /** Total number of segments */
- final int segmentCount;
- /** The detailed information on each segment */
- final SplitFileFetcherSegment[] segments;
- /** The splitfile data blocks. */
- final FreenetURI[] splitfileDataBlocks;
- /** The splitfile check blocks. */
- final FreenetURI[] splitfileCheckBlocks;
- /** Maximum temporary length */
- final long maxTempLength;
- /** Have all segments finished? Access synchronized. */
- private boolean allSegmentsFinished = false;
- /** Override length. If this is positive, truncate the splitfile to
this length. */
- private final long overrideLength;
- /** Accept non-full splitfile chunks? */
- private final boolean splitUseLengths;
- private boolean finished;
-
- public SplitFileFetcher(Metadata metadata, GetCompletionCallback rcb,
ClientGetter parent,
- FetcherContext newCtx, LinkedList decompressors,
ClientMetadata clientMetadata,
- ArchiveContext actx, int recursionLevel) throws
FetchException, MetadataParseException {
- this.finished = false;
- this.fetchContext = newCtx;
- this.archiveContext = actx;
- this.decompressors = decompressors;
- this.clientMetadata = clientMetadata;
- this.cb = rcb;
- this.recursionLevel = recursionLevel + 1;
- this.parent = parent;
- if(parent.isCancelled())
- throw new FetchException(FetchException.CANCELLED);
- overrideLength = metadata.dataLength();
- this.splitfileType = metadata.getSplitfileType();
- splitfileDataBlocks = metadata.getSplitfileDataKeys();
- splitfileCheckBlocks = metadata.getSplitfileCheckKeys();
- splitUseLengths = metadata.splitUseLengths();
- int blockLength = splitUseLengths ? -1 : NodeCHK.BLOCK_SIZE;
- if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT) {
- // Don't need to do much - just fetch everything and
piece it together.
- blocksPerSegment = -1;
- checkBlocksPerSegment = -1;
- segmentCount = 1;
- } else if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD) {
- byte[] params = metadata.splitfileParams();
- if(params == null || params.length < 8)
- throw new MetadataParseException("No splitfile
params");
- blocksPerSegment = Fields.bytesToInt(params, 0);
- checkBlocksPerSegment = Fields.bytesToInt(params, 4);
- if(blocksPerSegment >
fetchContext.maxDataBlocksPerSegment
- || checkBlocksPerSegment >
fetchContext.maxCheckBlocksPerSegment)
- throw new
FetchException(FetchException.TOO_MANY_BLOCKS_PER_SEGMENT, "Too many blocks per
segment: "+blocksPerSegment+" data, "+checkBlocksPerSegment+" check");
- segmentCount = (splitfileDataBlocks.length /
blocksPerSegment) +
- (splitfileDataBlocks.length % blocksPerSegment
== 0 ? 0 : 1);
- // Onion, 128/192.
- // Will be segmented.
- } else throw new MetadataParseException("Unknown splitfile
format: "+splitfileType);
- this.maxTempLength = fetchContext.maxTempLength;
- Logger.minor(this, "Algorithm: "+splitfileType+", blocks per
segment: "+blocksPerSegment+", check blocks per segment:
"+checkBlocksPerSegment+", segments: "+segmentCount);
- segments = new SplitFileFetcherSegment[segmentCount]; //
initially null on all entries
- if(segmentCount == 1) {
- segments[0] = new
SplitFileFetcherSegment(splitfileType, splitfileDataBlocks,
splitfileCheckBlocks, this, archiveContext, fetchContext, maxTempLength,
splitUseLengths, recursionLevel);
- } else {
- int dataBlocksPtr = 0;
- int checkBlocksPtr = 0;
- for(int i=0;i<segments.length;i++) {
- // Create a segment. Give it its keys.
- int copyDataBlocks =
Math.min(splitfileDataBlocks.length - dataBlocksPtr, blocksPerSegment);
- int copyCheckBlocks =
Math.min(splitfileCheckBlocks.length - checkBlocksPtr, checkBlocksPerSegment);
- FreenetURI[] dataBlocks = new
FreenetURI[copyDataBlocks];
- FreenetURI[] checkBlocks = new
FreenetURI[copyCheckBlocks];
- if(copyDataBlocks > 0)
- System.arraycopy(splitfileDataBlocks,
dataBlocksPtr, dataBlocks, 0, copyDataBlocks);
- if(copyCheckBlocks > 0)
- System.arraycopy(splitfileCheckBlocks,
checkBlocksPtr, checkBlocks, 0, copyCheckBlocks);
- dataBlocksPtr += copyDataBlocks;
- checkBlocksPtr += copyCheckBlocks;
- segments[i] = new
SplitFileFetcherSegment(splitfileType, dataBlocks, checkBlocks, this,
archiveContext, fetchContext, maxTempLength, splitUseLengths, recursionLevel+1);
- }
- }
- }
-
- /** Return the final status of the fetch. Throws an exception, or
returns a
- * Bucket containing the fetched data.
- * @throws FetchException If the fetch failed for some reason.
- */
- private Bucket finalStatus() throws FetchException {
- long finalLength = 0;
- for(int i=0;i<segments.length;i++) {
- SplitFileFetcherSegment s = segments[i];
- if(!s.isFinished()) throw new
IllegalStateException("Not all finished");
- s.throwError();
- // If still here, it succeeded
- finalLength += s.decodedLength();
- // Healing is done by Segment
- }
- if(finalLength > overrideLength)
- finalLength = overrideLength;
-
- long bytesWritten = 0;
- OutputStream os = null;
- Bucket output;
- try {
- output =
fetchContext.bucketFactory.makeBucket(finalLength);
- os = output.getOutputStream();
- for(int i=0;i<segments.length;i++) {
- SplitFileFetcherSegment s = segments[i];
- long max = (finalLength < 0 ? 0 : (finalLength
- bytesWritten));
- bytesWritten += s.writeDecodedDataTo(os, max);
- }
- } catch (IOException e) {
- throw new FetchException(FetchException.BUCKET_ERROR,
e);
- } finally {
- if(os != null) {
- try {
- os.close();
- } catch (IOException e) {
- // If it fails to close it may return
corrupt data.
- throw new
FetchException(FetchException.BUCKET_ERROR, e);
- }
- }
- }
- return output;
- }
-
- public void segmentFinished(SplitFileFetcherSegment segment) {
- Logger.minor(this, "Finished segment: "+segment);
- synchronized(this) {
- boolean allDone = true;
- for(int i=0;i<segments.length;i++)
- if(!segments[i].isFinished()) {
- Logger.minor(this, "Segment
"+segments[i]+" is not finished");
- allDone = false;
- }
- if(allDone) {
- if(allSegmentsFinished)
- Logger.error(this, "Was already
finished! (segmentFinished("+segment+")");
- else {
- allSegmentsFinished = true;
- finish();
- }
- }
- notifyAll();
- }
- }
-
- private void finish() {
- try {
- synchronized(this) {
- if(finished) {
- Logger.error(this, "Was already
finished");
- return;
- }
- finished = true;
- }
- Bucket data = finalStatus();
- // Decompress
- while(!decompressors.isEmpty()) {
- Compressor c = (Compressor)
decompressors.removeLast();
- try {
- data = c.decompress(data,
fetchContext.bucketFactory, Math.max(fetchContext.maxTempLength,
fetchContext.maxOutputLength));
- } catch (IOException e) {
- cb.onFailure(new
FetchException(FetchException.BUCKET_ERROR, e), this);
- return;
- } catch (CompressionOutputSizeException e) {
- cb.onFailure(new
FetchException(FetchException.TOO_BIG, e), this);
- return;
- }
- }
- cb.onSuccess(new FetchResult(clientMetadata, data),
this);
- } catch (FetchException e) {
- cb.onFailure(e, this);
- }
- }
-
- public ClientGetter getParent() {
- return parent;
- }
-
- public void onProgress() {
- int totalBlocks = splitfileDataBlocks.length;
- int fetchedBlocks = 0;
- int failedBlocks = 0;
- int fatallyFailedBlocks = 0;
- int runningBlocks = 0;
- for(int i=0;i<segments.length;i++) {
- SplitFileFetcherSegment segment = segments[i];
- Logger.minor(this, "Segment: "+segment+":
fetched="+segment.fetchedBlocks()+", failedBlocks: "+segment.failedBlocks()+
- ", fatally:
"+segment.fatallyFailedBlocks()+", running: "+segment.runningBlocks());
- fetchedBlocks += segment.fetchedBlocks();
- failedBlocks += segment.failedBlocks();
- fatallyFailedBlocks += segment.fatallyFailedBlocks();
- runningBlocks += segment.runningBlocks();
- }
- fetchContext.eventProducer.produceEvent(new
SplitfileProgressEvent(totalBlocks, fetchedBlocks, failedBlocks,
fatallyFailedBlocks, runningBlocks));
- }
-
- public void schedule() {
- for(int i=0;i<segments.length;i++) {
- segments[i].schedule();
- }
- }
-
- public void cancel() {
- for(int i=0;i<segments.length;i++)
- segments[i].cancel();
- }
-
-}
Copied: trunk/freenet/src/freenet/client/async/SplitFileFetcher.java (from rev
7929, branches/async-client/src/freenet/client/async/SplitFileFetcher.java)
Deleted: trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- branches/async-client/src/freenet/client/async/SplitFileFetcherSegment.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
2006-01-25 23:47:20 UTC (rev 7930)
@@ -1,335 +0,0 @@
-package freenet.client.async;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.MalformedURLException;
-
-import freenet.client.ArchiveContext;
-import freenet.client.FECCodec;
-import freenet.client.FailureCodeTracker;
-import freenet.client.FetchException;
-import freenet.client.FetchResult;
-import freenet.client.FetcherContext;
-import freenet.client.Metadata;
-import freenet.client.MetadataParseException;
-import freenet.client.SplitfileBlock;
-import freenet.keys.FreenetURI;
-import freenet.support.Bucket;
-import freenet.support.BucketTools;
-import freenet.support.Logger;
-
-/**
- * A single segment within a SplitFileFetcher.
- * This in turn controls a large number of SingleFileFetcher's.
- */
-public class SplitFileFetcherSegment implements GetCompletionCallback {
-
- final short splitfileType;
- final FreenetURI[] dataBlocks;
- final FreenetURI[] checkBlocks;
- final SingleFileFetcher[] dataBlockStatus;
- final SingleFileFetcher[] checkBlockStatus;
- final MinimalSplitfileBlock[] dataBuckets;
- final MinimalSplitfileBlock[] checkBuckets;
- final int minFetched;
- final SplitFileFetcher parentFetcher;
- final ArchiveContext archiveContext;
- final FetcherContext fetcherContext;
- final long maxBlockLength;
- final boolean nonFullBlocksAllowed;
- /** Has the segment finished processing? Irreversible. */
- private boolean finished;
- private boolean decoded;
- /** Bucket to store the data retrieved, after it has been decoded */
- private Bucket decodedData;
- /** Fetch context for block fetches */
- final FetcherContext blockFetchContext;
- /** Recursion level */
- final int recursionLevel;
- private FetchException failureException;
- private int fatallyFailedBlocks;
- private int failedBlocks;
- private int fetchedBlocks;
- private FailureCodeTracker errors;
-
- public SplitFileFetcherSegment(short splitfileType, FreenetURI[]
splitfileDataBlocks, FreenetURI[] splitfileCheckBlocks, SplitFileFetcher
fetcher, ArchiveContext archiveContext, FetcherContext fetchContext, long
maxTempLength, boolean splitUseLengths, int recursionLevel) throws
MetadataParseException, FetchException {
- this.parentFetcher = fetcher;
- this.archiveContext = archiveContext;
- this.splitfileType = splitfileType;
- dataBlocks = splitfileDataBlocks;
- checkBlocks = splitfileCheckBlocks;
- if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT) {
- minFetched = dataBlocks.length;
- } else if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD) {
- minFetched = dataBlocks.length;
- } else throw new MetadataParseException("Unknown splitfile
type"+splitfileType);
- finished = false;
- decodedData = null;
- dataBlockStatus = new SingleFileFetcher[dataBlocks.length];
- checkBlockStatus = new SingleFileFetcher[checkBlocks.length];
- dataBuckets = new MinimalSplitfileBlock[dataBlocks.length];
- checkBuckets = new MinimalSplitfileBlock[checkBlocks.length];
- for(int i=0;i<dataBuckets.length;i++) {
- dataBuckets[i] = new MinimalSplitfileBlock(i);
- }
- for(int i=0;i<checkBuckets.length;i++)
- checkBuckets[i] = new
MinimalSplitfileBlock(i+dataBuckets.length);
- nonFullBlocksAllowed = splitUseLengths;
- this.fetcherContext = fetchContext;
- maxBlockLength = maxTempLength;
- if(splitUseLengths) {
- blockFetchContext = new FetcherContext(fetcherContext,
FetcherContext.SPLITFILE_USE_LENGTHS_MASK);
- this.recursionLevel = recursionLevel + 1;
- } else {
- blockFetchContext = new FetcherContext(fetcherContext,
FetcherContext.SPLITFILE_DEFAULT_BLOCK_MASK);
- this.recursionLevel = 0;
- }
- }
-
- public boolean isFinished() {
- return finished;
- }
-
- /** Throw a FetchException, if we have one. Else do nothing. */
- public synchronized void throwError() throws FetchException {
- if(failureException != null)
- throw failureException;
- }
-
- /** Decoded length? */
- public long decodedLength() {
- return decodedData.size();
- }
-
- /** Write the decoded segment's data to an OutputStream */
- public long writeDecodedDataTo(OutputStream os, long truncateLength)
throws IOException {
- long len = decodedData.size();
- if(truncateLength >= 0 && truncateLength < len)
- len = truncateLength;
- BucketTools.copyTo(decodedData, os, Math.min(truncateLength,
decodedData.size()));
- return len;
- }
-
- /** How many blocks have failed due to running out of retries? */
- public synchronized int failedBlocks() {
- return failedBlocks;
- }
-
- /** How many blocks have been successfully fetched? */
- public synchronized int fetchedBlocks() {
- return fetchedBlocks;
- }
-
- /** How many blocks have currently running requests? */
- public int runningBlocks() {
- // FIXME implement or throw out
- return 0;
- }
-
- /** How many blocks failed permanently due to fatal errors? */
- public int fatallyFailedBlocks() {
- return fatallyFailedBlocks;
- }
-
- public synchronized void onSuccess(FetchResult result, ClientGetState
state) {
- if(finished) return;
- Integer token = (Integer) ((SingleFileFetcher)state).getToken();
- int blockNo = token.intValue();
- if(blockNo < dataBlocks.length) {
- if(dataBlocks[blockNo] == null) {
- Logger.error(this, "Block already finished:
"+blockNo);
- return;
- }
- dataBlocks[blockNo] = null;
- dataBuckets[blockNo].setData(result.asBucket());
- } else if(blockNo < checkBlocks.length + dataBlocks.length) {
- blockNo -= dataBlocks.length;
- if(checkBlocks[blockNo] == null) {
- Logger.error(this, "Check block already
finished: "+blockNo);
- return;
- }
- checkBlocks[blockNo] = null;
- checkBuckets[blockNo].setData(result.asBucket());
- } else
- Logger.error(this, "Unrecognized block number:
"+blockNo, new Exception("error"));
- fetchedBlocks++;
- if(fetchedBlocks >= minFetched)
- startDecode();
- }
-
- private void startDecode() {
- synchronized(this) {
- if(decoded) return;
- decoded = true;
- }
- Runnable r = new Decoder();
- Thread t = new Thread(r, "Decoder for "+this);
- t.setDaemon(true);
- t.start();
- }
-
- class Decoder implements Runnable {
-
- public void run() {
-
- // Now decode
- Logger.minor(this, "Decoding "+this);
-
- FECCodec codec = FECCodec.getCodec(splitfileType,
dataBlocks.length, checkBlocks.length);
- try {
- if(splitfileType !=
Metadata.SPLITFILE_NONREDUNDANT) {
- // FIXME hardcoded block size below.
- codec.decode(dataBuckets, checkBuckets,
32768, fetcherContext.bucketFactory);
- // Now have all the data blocks (not
necessarily all the check blocks)
- }
-
- decodedData =
fetcherContext.bucketFactory.makeBucket(-1);
- Logger.minor(this, "Copying data from data
blocks");
- OutputStream os = decodedData.getOutputStream();
- for(int i=0;i<dataBlockStatus.length;i++) {
- SplitfileBlock status = dataBuckets[i];
- Bucket data = status.getData();
- BucketTools.copyTo(data, os,
Long.MAX_VALUE);
- }
- Logger.minor(this, "Copied data");
- os.close();
- // Must set finished BEFORE calling
parentFetcher.
- // Otherwise a race is possible that might
result in it not seeing our finishing.
- finished = true;
-
parentFetcher.segmentFinished(SplitFileFetcherSegment.this);
- } catch (IOException e) {
- Logger.minor(this, "Caught bucket error?: "+e,
e);
- finished = true;
- failureException = new
FetchException(FetchException.BUCKET_ERROR);
-
parentFetcher.segmentFinished(SplitFileFetcherSegment.this);
- return;
- }
-
- // Now heal
-
- // Encode any check blocks we don't have
- if(codec != null) {
- try {
- codec.encode(dataBuckets, checkBuckets,
32768, fetcherContext.bucketFactory);
- } catch (IOException e) {
- Logger.error(this, "Bucket error while
healing: "+e, e);
- }
- }
-
- // Now insert *ALL* blocks on which we had at least one
failure, and didn't eventually succeed
- for(int i=0;i<dataBlockStatus.length;i++) {
- if(dataBuckets[i].getData() != null) continue;
- SingleFileFetcher fetcher = dataBlockStatus[i];
- if(fetcher.getRetryCount() == 0) {
- // 80% chance of not inserting, if we
never tried it
- if(fetcherContext.random.nextInt(5) ==
0) continue;
- }
- queueHeal(dataBuckets[i].getData());
- }
- for(int i=0;i<checkBlockStatus.length;i++) {
- if(checkBuckets[i].getData() != null) continue;
- SingleFileFetcher fetcher = checkBlockStatus[i];
- if(fetcher.getRetryCount() == 0) {
- // 80% chance of not inserting, if we
never tried it
- if(fetcherContext.random.nextInt(5) ==
0) continue;
- }
- queueHeal(checkBuckets[i].getData());
- }
-
- for(int i=0;i<dataBlocks.length;i++) {
- dataBuckets[i] = null;
- dataBlockStatus[i] = null;
- dataBlocks[i] = null;
- }
- for(int i=0;i<checkBlocks.length;i++) {
- checkBuckets[i] = null;
- checkBlockStatus[i] = null;
- checkBlocks[i] = null;
- }
- }
-
- }
-
- private void queueHeal(Bucket data) {
- // TODO Auto-generated method stub
-
- }
-
- /** This is after any retries and therefore is either out-of-retries or
fatal */
- public synchronized void onFailure(FetchException e, ClientGetState
state) {
- Integer token = (Integer) ((SingleFileFetcher)state).getToken();
- int blockNo = token.intValue();
- if(blockNo < dataBlocks.length) {
- if(dataBlocks[blockNo] == null) {
- Logger.error(this, "Block already finished:
"+blockNo);
- return;
- }
- dataBlocks[blockNo] = null;
- } else if(blockNo < checkBlocks.length) {
- if(checkBlocks[blockNo-dataBlocks.length] == null) {
- Logger.error(this, "Block already finished:
"+blockNo);
- return;
- }
- checkBlocks[blockNo-dataBlocks.length] = null;
- } else
- Logger.error(this, "Unrecognized block number:
"+blockNo, new Exception("error"));
- // :(
- Logger.minor(this, "Permanently failed: "+state+" on "+this);
- if(e.isFatal())
- fatallyFailedBlocks++;
- else
- failedBlocks++;
- // FIXME this may not be accurate across all the retries?
- if(e.errorCodes != null)
- errors.merge(e.errorCodes);
- else
- errors.inc(new Integer(e.mode),
((SingleFileFetcher)state).getRetryCount());
- if(failedBlocks + fatallyFailedBlocks > (dataBlocks.length +
checkBlocks.length - minFetched)) {
- fail(new FetchException(FetchException.SPLITFILE_ERROR,
errors));
- }
- }
-
- private void fail(FetchException e) {
- synchronized(this) {
- if(finished) return;
- finished = true;
- this.failureException = e;
- }
- for(int i=0;i<dataBlockStatus.length;i++) {
- SingleFileFetcher f = dataBlockStatus[i];
- if(f != null)
- f.cancel();
- }
- for(int i=0;i<checkBlockStatus.length;i++) {
- SingleFileFetcher f = dataBlockStatus[i];
- if(f != null)
- f.cancel();
- }
- parentFetcher.segmentFinished(this);
- }
-
- public void schedule() {
- try {
- for(int i=0;i<dataBlocks.length;i++) {
- dataBlockStatus[i] =
- new
SingleFileFetcher(parentFetcher.parent, this, null, dataBlocks[i],
blockFetchContext, archiveContext, blockFetchContext.maxSplitfileBlockRetries,
recursionLevel, true, new Integer(i));
- dataBlockStatus[i].schedule();
- }
- for(int i=0;i<checkBlocks.length;i++) {
- checkBlockStatus[i] =
- new
SingleFileFetcher(parentFetcher.parent, this, null, checkBlocks[i],
blockFetchContext, archiveContext, blockFetchContext.maxSplitfileBlockRetries,
recursionLevel, true, new Integer(dataBlocks.length+i));
- checkBlockStatus[i].schedule();
- }
- } catch (MalformedURLException e) {
- // Invalidates the whole splitfile
- fail(new FetchException(FetchException.INVALID_URI,
"Invalid URI in splitfile"));
- } catch (Throwable t) {
- fail(new FetchException(FetchException.INVALID_URI, t));
- }
- }
-
- public void cancel() {
- fail(new FetchException(FetchException.CANCELLED));
- }
-
-}
Copied: trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
(from rev 7929,
branches/async-client/src/freenet/client/async/SplitFileFetcherSegment.java)
Deleted: trunk/freenet/src/freenet/client/async/SplitFileInserter.java
===================================================================
--- branches/async-client/src/freenet/client/async/SplitFileInserter.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/async/SplitFileInserter.java
2006-01-25 23:47:20 UTC (rev 7930)
@@ -1,268 +0,0 @@
-package freenet.client.async;
-
-import java.io.IOException;
-import java.util.Vector;
-
-import freenet.client.ClientMetadata;
-import freenet.client.FECCodec;
-import freenet.client.FailureCodeTracker;
-import freenet.client.InserterContext;
-import freenet.client.InserterException;
-import freenet.client.Metadata;
-import freenet.client.async.SingleFileInserter.SplitHandler;
-import freenet.keys.ClientCHKBlock;
-import freenet.keys.FreenetURI;
-import freenet.support.Bucket;
-import freenet.support.BucketTools;
-import freenet.support.Logger;
-import freenet.support.compress.Compressor;
-
-public class SplitFileInserter implements ClientPutState {
-
- final BaseClientPutter parent;
- final InserterContext ctx;
- final PutCompletionCallback cb;
- final long dataLength;
- final short compressionCodec;
- final short splitfileAlgorithm;
- final int segmentSize;
- final int checkSegmentSize;
- final SplitFileInserterSegment[] segments;
- final boolean getCHKOnly;
- final int countCheckBlocks;
- final int countDataBlocks;
- private boolean haveSentMetadata;
- final ClientMetadata cm;
- final boolean isMetadata;
- private boolean finished;
-
- public SplitFileInserter(BaseClientPutter put, PutCompletionCallback
cb, Bucket data, Compressor bestCodec, ClientMetadata clientMetadata,
InserterContext ctx, boolean getCHKOnly, boolean isMetadata) throws
InserterException {
- this.parent = put;
- this.finished = false;
- this.isMetadata = isMetadata;
- this.cm = clientMetadata;
- this.getCHKOnly = getCHKOnly;
- this.cb = cb;
- this.ctx = ctx;
- Bucket[] dataBuckets;
- try {
- dataBuckets = BucketTools.split(data,
ClientCHKBlock.DATA_LENGTH, ctx.bf);
- } catch (IOException e) {
- throw new
InserterException(InserterException.BUCKET_ERROR, e, null);
- }
- countDataBlocks = dataBuckets.length;
- // Encoding is done by segments
- if(bestCodec == null)
- compressionCodec = -1;
- else
- compressionCodec = bestCodec.codecNumberForMetadata();
- this.splitfileAlgorithm = ctx.splitfileAlgorithm;
- this.dataLength = data.size();
- segmentSize = ctx.splitfileSegmentDataBlocks;
- checkSegmentSize = splitfileAlgorithm ==
Metadata.SPLITFILE_NONREDUNDANT ? 0 : ctx.splitfileSegmentCheckBlocks;
-
- // Create segments
- segments = splitIntoSegments(segmentSize, dataBuckets);
- int count = 0;
- for(int i=0;i<segments.length;i++)
- count += segments[i].countCheckBlocks();
- countCheckBlocks = count;
- }
-
- /**
- * Group the blocks into segments.
- */
- private SplitFileInserterSegment[] splitIntoSegments(int segmentSize,
Bucket[] origDataBlocks) {
- int dataBlocks = origDataBlocks.length;
-
- Vector segs = new Vector();
-
- FECCodec codec = FECCodec.getCodec(splitfileAlgorithm,
origDataBlocks.length);
-
- // First split the data up
- if(dataBlocks < segmentSize || segmentSize == -1) {
- // Single segment
- SplitFileInserterSegment onlySeg = new
SplitFileInserterSegment(this, codec, origDataBlocks, ctx, getCHKOnly, 0);
- segs.add(onlySeg);
- } else {
- int j = 0;
- int segNo = 0;
- for(int i=segmentSize;;i+=segmentSize) {
- if(i > dataBlocks) i = dataBlocks;
- Bucket[] seg = new Bucket[i-j];
- System.arraycopy(origDataBlocks, j, seg, 0,
i-j);
- j = i;
- for(int x=0;x<seg.length;x++)
- if(seg[x] == null) throw new
NullPointerException("In splitIntoSegs: "+x+" is null of "+seg.length+" of
"+segNo);
- SplitFileInserterSegment s = new
SplitFileInserterSegment(this, codec, seg, ctx, getCHKOnly, segNo);
- segs.add(s);
-
- if(i == dataBlocks) break;
- segNo++;
- }
- }
- return (SplitFileInserterSegment[]) segs.toArray(new
SplitFileInserterSegment[segs.size()]);
- }
-
- public void start() throws InserterException {
- for(int i=0;i<segments.length;i++)
- segments[i].start();
- }
-
- public void encodedSegment(SplitFileInserterSegment segment) {
- Logger.minor(this, "Encoded segment "+segment.segNo+" of
"+this);
- }
-
- public void segmentHasURIs(SplitFileInserterSegment segment) {
- if(haveSentMetadata) {
- Logger.error(this, "WTF? Already sent metadata");
- return;
- }
-
- boolean allHaveURIs = true;
- synchronized(this) {
- for(int i=0;i<segments.length;i++) {
- if(!segments[i].isEncoded())
- allHaveURIs = false;
- }
- }
-
- if(allHaveURIs) {
- Logger.minor(this, "Have URIs from all segments");
- boolean missingURIs;
- Metadata m = null;
- synchronized(this) {
- // Create metadata
- FreenetURI[] dataURIs = getDataURIs();
- FreenetURI[] checkURIs = getCheckURIs();
-
- Logger.minor(this, "Data URIs:
"+dataURIs.length+", check URIs: "+checkURIs.length);
-
- missingURIs = anyNulls(dataURIs) ||
anyNulls(checkURIs);
-
- if(!missingURIs) {
- // Create Metadata
- m = new Metadata(splitfileAlgorithm,
dataURIs, checkURIs, segmentSize, checkSegmentSize, cm, dataLength,
compressionCodec, isMetadata);
- }
- haveSentMetadata = true;
- }
- if(missingURIs) {
- Logger.minor(this, "Missing URIs");
- // Error
- fail(new
InserterException(InserterException.INTERNAL_ERROR, "Missing URIs after
encoding", null));
- return;
- } else
- cb.onMetadata(m, this);
- }
-
- }
-
- private void fail(InserterException e) {
- synchronized(this) {
- if(finished) return;
- finished = true;
- }
- cb.onFailure(e, this);
- }
-
- // FIXME move this to somewhere
- private static boolean anyNulls(Object[] array) {
- for(int i=0;i<array.length;i++)
- if(array[i] == null) return true;
- return false;
- }
-
- private FreenetURI[] getCheckURIs() {
- // Copy check blocks from each segment into a FreenetURI[].
- FreenetURI[] uris = new FreenetURI[countCheckBlocks];
- int x = 0;
- for(int i=0;i<segments.length;i++) {
- FreenetURI[] segURIs = segments[i].getCheckURIs();
- if(x + segURIs.length > countCheckBlocks)
- throw new IllegalStateException("x="+x+",
segURIs="+segURIs.length+", countCheckBlocks="+countCheckBlocks);
- System.arraycopy(segURIs, 0, uris, x, segURIs.length);
- x += segURIs.length;
- }
-
- if(uris.length != x)
- throw new IllegalStateException("Total is wrong");
-
- return uris;
- }
-
- private FreenetURI[] getDataURIs() {
- // Copy check blocks from each segment into a FreenetURI[].
- FreenetURI[] uris = new FreenetURI[countDataBlocks];
- int x = 0;
- for(int i=0;i<segments.length;i++) {
- FreenetURI[] segURIs = segments[i].getDataURIs();
- if(x + segURIs.length > countDataBlocks)
- throw new IllegalStateException("x="+x+",
segURIs="+segURIs.length+", countDataBlocks="+countDataBlocks);
- System.arraycopy(segURIs, 0, uris, x, segURIs.length);
- x += segURIs.length;
- }
-
- if(uris.length != x)
- throw new IllegalStateException("Total is wrong");
-
- return uris;
- }
-
- public BaseClientPutter getParent() {
- return parent;
- }
-
- public void segmentFinished(SplitFileInserterSegment segment) {
- Logger.minor(this, "Segment finished: "+segment);
- boolean allGone = true;
- synchronized(this) {
- if(finished) return;
- for(int i=0;i<segments.length;i++)
- if(!segments[i].isFinished()) allGone = false;
-
- InserterException e = segment.getException();
- if(e != null && e.isFatal()) {
- cancel();
- } else {
- if(!allGone) return;
- }
- finished = true;
- }
- try {
- // Finished !!
- FailureCodeTracker tracker = new FailureCodeTracker(true);
- boolean allSucceeded = true;
- for(int i=0;i<segments.length;i++) {
- InserterException e = segments[i].getException();
- if(e == null) continue;
- allSucceeded = false;
- if(e.errorCodes != null)
- tracker.merge(e.errorCodes);
- tracker.inc(e.getMode());
- }
- if(allSucceeded)
- cb.onSuccess(this);
- else {
- InserterException e;
- if(tracker.isFatal(true))
- cb.onFailure(new
InserterException(InserterException.FATAL_ERRORS_IN_BLOCKS, tracker, null),
this);
- else
- cb.onFailure(new
InserterException(InserterException.TOO_MANY_RETRIES_IN_BLOCKS, tracker, null),
this);
- }
- } catch (Throwable t) {
- // We MUST tell the parent *something*!
- Logger.error(this, "Caught "+t, t);
- cb.onFailure(new
InserterException(InserterException.INTERNAL_ERROR), this);
- }
- }
-
- public void cancel() {
- synchronized(this) {
- if(finished) return;
- finished = true;
- }
- for(int i=0;i<segments.length;i++)
- segments[i].cancel();
- }
-
-}
Copied: trunk/freenet/src/freenet/client/async/SplitFileInserter.java (from rev
7929, branches/async-client/src/freenet/client/async/SplitFileInserter.java)
Deleted: trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java
===================================================================
---
branches/async-client/src/freenet/client/async/SplitFileInserterSegment.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java
2006-01-25 23:47:20 UTC (rev 7930)
@@ -1,237 +0,0 @@
-package freenet.client.async;
-
-import java.io.IOException;
-
-import freenet.client.FECCodec;
-import freenet.client.FailureCodeTracker;
-import freenet.client.InserterContext;
-import freenet.client.InserterException;
-import freenet.client.Metadata;
-import freenet.keys.ClientCHKBlock;
-import freenet.keys.ClientKey;
-import freenet.keys.FreenetURI;
-import freenet.support.Bucket;
-import freenet.support.Logger;
-
-public class SplitFileInserterSegment implements PutCompletionCallback {
-
- final SplitFileInserter parent;
- final FECCodec splitfileAlgo;
- final Bucket[] dataBlocks;
- final Bucket[] checkBlocks;
- final FreenetURI[] dataURIs;
- final FreenetURI[] checkURIs;
- final SingleBlockInserter[] dataBlockInserters;
- final SingleBlockInserter[] checkBlockInserters;
- final InserterContext blockInsertContext;
- final int segNo;
- private boolean encoded;
- private boolean finished;
- private boolean getCHKOnly;
- private InserterException toThrow;
- private final FailureCodeTracker errors;
- private int blocksGotURI;
- private int blocksCompleted;
-
- public SplitFileInserterSegment(SplitFileInserter parent, FECCodec
splitfileAlgo, Bucket[] origDataBlocks, InserterContext blockInsertContext,
boolean getCHKOnly, int segNo) {
- this.parent = parent;
- this.errors = new FailureCodeTracker(true);
- this.blockInsertContext = blockInsertContext;
- this.splitfileAlgo = splitfileAlgo;
- this.dataBlocks = origDataBlocks;
- int checkBlockCount = splitfileAlgo == null ? 0 :
splitfileAlgo.countCheckBlocks();
- checkBlocks = new Bucket[checkBlockCount];
- checkURIs = new FreenetURI[checkBlockCount];
- dataURIs = new FreenetURI[origDataBlocks.length];
- dataBlockInserters = new SingleBlockInserter[dataBlocks.length];
- checkBlockInserters = new
SingleBlockInserter[checkBlocks.length];
- this.segNo = segNo;
- }
-
- public void start() throws InserterException {
- for(int i=0;i<dataBlockInserters.length;i++) {
- dataBlockInserters[i] =
- new SingleBlockInserter(parent.parent,
dataBlocks[i], (short)-1, FreenetURI.EMPTY_CHK_URI, blockInsertContext, this,
false, ClientCHKBlock.DATA_LENGTH, i, getCHKOnly);
- dataBlockInserters[i].schedule();
- }
- if(splitfileAlgo == null) {
- // Don't need to encode blocks
- } else {
- // Encode blocks
- Thread t = new Thread(new EncodeBlocksRunnable(),
"Blocks encoder");
- t.setDaemon(true);
- t.start();
- }
- }
-
- private class EncodeBlocksRunnable implements Runnable {
-
- public void run() {
- encode();
- }
- }
-
- void encode() {
- try {
- splitfileAlgo.encode(dataBlocks, checkBlocks,
ClientCHKBlock.DATA_LENGTH, blockInsertContext.bf);
- // Success! Start the fetches.
- encoded = true;
- parent.encodedSegment(this);
- // Start the inserts
- for(int i=0;i<checkBlockInserters.length;i++) {
- checkBlockInserters[i] =
- new SingleBlockInserter(parent.parent,
checkBlocks[i], (short)-1, FreenetURI.EMPTY_CHK_URI, blockInsertContext, this,
false, ClientCHKBlock.DATA_LENGTH, i + dataBlocks.length, getCHKOnly);
- checkBlockInserters[i].schedule();
- }
- } catch (IOException e) {
- InserterException ex =
- new
InserterException(InserterException.BUCKET_ERROR, e, null);
- finish(ex);
- } catch (Throwable t) {
- InserterException ex =
- new
InserterException(InserterException.INTERNAL_ERROR, t, null);
- finish(ex);
- }
- }
-
- private void finish(InserterException ex) {
- synchronized(this) {
- if(finished) return;
- finished = true;
- toThrow = ex;
- }
- parent.segmentFinished(this);
- }
-
- private void finish() {
- synchronized(this) {
- if(finished) return;
- finished = true;
- }
- toThrow = InserterException.construct(errors);
- parent.segmentFinished(this);
- }
-
- public void onEncode(ClientKey key, ClientPutState state) {
- SingleBlockInserter sbi = (SingleBlockInserter)state;
- int x = sbi.token;
- synchronized(this) {
- if(x >= dataBlocks.length) {
- if(checkURIs[x-dataBlocks.length] != null) {
- Logger.normal(this, "Got uri twice for
check block "+x+" on "+this);
- return;
- }
- checkURIs[x-dataBlocks.length] = key.getURI();
- } else {
- if(dataURIs[x] != null) {
- Logger.normal(this, "Got uri twice for
data block "+x+" on "+this);
- return;
- }
- dataURIs[x] = key.getURI();
- }
- blocksGotURI++;
- if(blocksGotURI != dataBlocks.length +
checkBlocks.length) return;
- }
- // Double check
- for(int i=0;i<checkURIs.length;i++)
- if(checkURIs[i] == null) {
- Logger.error(this, "Check URI "+i+" is null");
- return;
- }
- for(int i=0;i<dataURIs.length;i++) {
- if(dataURIs[i] == null) {
- Logger.error(this, "Data URI "+i+" is null");
- return;
- }
- }
- parent.segmentHasURIs(this);
- }
-
- public void onSuccess(ClientPutState state) {
- SingleBlockInserter sbi = (SingleBlockInserter)state;
- int x = sbi.token;
- if(completed(x)) return;
- finish();
- }
-
- public void onFailure(InserterException e, ClientPutState state) {
- SingleBlockInserter sbi = (SingleBlockInserter)state;
- int x = sbi.token;
- errors.merge(e);
- if(completed(x)) return;
- finish();
- }
-
- private boolean completed(int x) {
- synchronized(this) {
- if(x >= dataBlocks.length) {
- if(checkBlockInserters[x-dataBlocks.length] ==
null) {
- Logger.error(this, "Completed twice:
check block "+x+" on "+this);
- return true;
- }
- checkBlockInserters[x-dataBlocks.length] = null;
- } else {
- if(dataBlockInserters[x] == null) {
- Logger.error(this, "Completed twice:
data block "+x+" on "+this);
- return true;
- }
- dataBlockInserters[x] = null;
- }
- blocksCompleted++;
- if(blocksCompleted != dataBlockInserters.length +
checkBlockInserters.length) return true;
- return false;
- }
- }
-
- public boolean isFinished() {
- return finished;
- }
-
- public boolean isEncoded() {
- return encoded;
- }
-
- public int countCheckBlocks() {
- return checkBlocks.length;
- }
-
- public FreenetURI[] getCheckURIs() {
- return checkURIs;
- }
-
- public FreenetURI[] getDataURIs() {
- return dataURIs;
- }
-
- InserterException getException() {
- return toThrow;
- }
-
- public void cancel() {
- synchronized(this) {
- if(finished) return;
- finished = true;
- }
- if(toThrow != null)
- toThrow = new
InserterException(InserterException.CANCELLED);
- for(int i=0;i<dataBlockInserters.length;i++) {
- SingleBlockInserter sbi = dataBlockInserters[i];
- if(sbi != null)
- sbi.cancel();
- }
- for(int i=0;i<checkBlockInserters.length;i++) {
- SingleBlockInserter sbi = checkBlockInserters[i];
- if(sbi != null)
- sbi.cancel();
- }
- parent.segmentFinished(this);
- }
-
- public void onTransition(ClientPutState oldState, ClientPutState
newState) {
- Logger.error(this, "Illegal transition in
SplitFileInserterSegment: "+oldState+" -> "+newState);
- }
-
- public void onMetadata(Metadata m, ClientPutState state) {
- Logger.error(this, "Got onMetadata from "+state);
- }
-}
Copied: trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java
(from rev 7929,
branches/async-client/src/freenet/client/async/SplitFileInserterSegment.java)
Modified: trunk/freenet/src/freenet/keys/Key.java
===================================================================
--- trunk/freenet/src/freenet/keys/Key.java 2006-01-25 23:39:26 UTC (rev
7929)
+++ trunk/freenet/src/freenet/keys/Key.java 2006-01-25 23:47:20 UTC (rev
7930)
@@ -136,7 +136,7 @@
// because compressing it improves its entropy.
if(sourceData.size() > MAX_LENGTH_BEFORE_COMPRESSION)
throw new KeyEncodeException("Too big");
- if(!dontCompress) {
+ if((!dontCompress) || alreadyCompressedCodec >= 0) {
byte[] cbuf = null;
if(alreadyCompressedCodec >= 0) {
if(sourceData.size() > MAX_COMPRESSED_DATA_LENGTH)
Modified: trunk/freenet/src/freenet/node/LowLevelGetException.java
===================================================================
--- trunk/freenet/src/freenet/node/LowLevelGetException.java 2006-01-25
23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/node/LowLevelGetException.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -56,7 +56,7 @@
this.code = code;
}
- LowLevelGetException(int reason) {
+ public LowLevelGetException(int reason) {
super(getMessage(reason));
this.code = reason;
}
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2006-01-25 23:39:26 UTC (rev
7929)
+++ trunk/freenet/src/freenet/node/Node.java 2006-01-25 23:47:20 UTC (rev
7930)
@@ -27,6 +27,7 @@
import freenet.client.ArchiveManager;
import freenet.client.HighLevelSimpleClient;
import freenet.client.HighLevelSimpleClientImpl;
+import freenet.client.async.ClientRequestScheduler;
import freenet.clients.http.FproxyToadlet;
import freenet.clients.http.SimpleToadletServer;
import freenet.crypt.DSAPublicKey;
@@ -53,6 +54,7 @@
import freenet.keys.ClientSSKBlock;
import freenet.keys.Key;
import freenet.keys.KeyBlock;
+import freenet.keys.KeyVerifyException;
import freenet.keys.NodeCHK;
import freenet.keys.NodeSSK;
import freenet.keys.SSKBlock;
@@ -76,7 +78,7 @@
/**
* @author amphibian
*/
-public class Node implements QueueingSimpleLowLevelClient {
+public class Node {
static final long serialVersionUID = -1;
@@ -190,6 +192,8 @@
final File downloadDir;
final TestnetHandler testnetHandler;
final TestnetStatusUploader statusUploader;
+ public final ClientRequestScheduler fetchScheduler;
+ public final ClientRequestScheduler putScheduler;
// Client stuff that needs to be configged - FIXME
static final int MAX_ARCHIVE_HANDLERS = 200; // don't take up much RAM...
FIXME
@@ -333,7 +337,7 @@
t.setPriority(Thread.MAX_PRIORITY);
t.start();
SimpleToadletServer server = new SimpleToadletServer(port+2000);
- FproxyToadlet fproxy = new
FproxyToadlet(n.makeClient(RequestStarter.INTERACTIVE_PRIORITY_CLASS,
(short)0));
+ FproxyToadlet fproxy = new
FproxyToadlet(n.makeClient(RequestStarter.INTERACTIVE_PRIORITY_CLASS));
server.register(fproxy, "/", false);
System.out.println("Starting fproxy on port "+(port+2000));
new FCPServer(port+3000, n);
@@ -450,11 +454,17 @@
tempBucketFactory = new
PaddedEphemerallyEncryptedBucketFactory(new
TempBucketFactory(tempFilenameGenerator), random, 1024);
archiveManager = new ArchiveManager(MAX_ARCHIVE_HANDLERS,
MAX_CACHED_ARCHIVE_DATA, MAX_ARCHIVE_SIZE, MAX_ARCHIVED_FILE_SIZE,
MAX_CACHED_ELEMENTS, random, tempFilenameGenerator);
requestThrottle = new RequestThrottle(5000, 2.0F);
- requestStarter = new RequestStarter(requestThrottle, "Request
starter ("+portNumber+")");
+ requestStarter = new RequestStarter(this, requestThrottle,
"Request starter ("+portNumber+")");
+ fetchScheduler = new ClientRequestScheduler(false, random,
requestStarter, this);
+ requestStarter.setScheduler(fetchScheduler);
+ requestStarter.start();
//insertThrottle = new ChainedRequestThrottle(10000, 2.0F,
requestThrottle);
// FIXME reenable the above
insertThrottle = new RequestThrottle(10000, 2.0F);
- insertStarter = new RequestStarter(insertThrottle, "Insert
starter ("+portNumber+")");
+ insertStarter = new RequestStarter(this, insertThrottle,
"Insert starter ("+portNumber+")");
+ putScheduler = new ClientRequestScheduler(true, random,
insertStarter, this);
+ insertStarter.setScheduler(putScheduler);
+ insertStarter.start();
if(testnetHandler != null)
testnetHandler.start();
if(statusUploader != null)
@@ -469,18 +479,6 @@
usm.start();
}
- public ClientKeyBlock getKey(ClientKey key, boolean localOnly,
RequestStarterClient client, boolean cache, boolean ignoreStore) throws
LowLevelGetException {
- if(key instanceof ClientSSK) {
- ClientSSK k = (ClientSSK) key;
- if(k.getPubKey() != null)
- cacheKey(k.pubKeyHash, k.getPubKey());
- }
- if(localOnly)
- return realGetKey(key, localOnly, cache, ignoreStore);
- else
- return client.getKey(key, localOnly, cache, ignoreStore);
- }
-
public ClientKeyBlock realGetKey(ClientKey key, boolean localOnly, boolean
cache, boolean ignoreStore) throws LowLevelGetException {
if(key instanceof ClientCHK)
return realGetCHK((ClientCHK)key, localOnly, cache,
ignoreStore);
@@ -667,10 +665,6 @@
}
}
- public void putKey(ClientKeyBlock block, RequestStarterClient client,
boolean cache) throws LowLevelPutException {
- client.putKey(block, cache);
- }
-
public void realPut(ClientKeyBlock block, boolean cache) throws
LowLevelPutException {
if(block instanceof ClientCHKBlock)
realPutCHK((ClientCHKBlock)block, cache);
@@ -1363,8 +1357,8 @@
writeNodeFile();
}
- public HighLevelSimpleClient makeClient(short prioClass, short prio) {
- return new HighLevelSimpleClientImpl(this, archiveManager,
tempBucketFactory, random, makeStarterClient(prioClass, prio, false),
makeStarterClient(prioClass, prio, true), !DONT_CACHE_LOCAL_REQUESTS);
+ public HighLevelSimpleClient makeClient(short prioClass) {
+ return new HighLevelSimpleClientImpl(this, archiveManager,
tempBucketFactory, random, !DONT_CACHE_LOCAL_REQUESTS, prioClass);
}
private static class MemoryChecker implements Runnable {
@@ -1390,10 +1384,6 @@
return insertThrottle;
}
- public RequestStarterClient makeStarterClient(short prioClass, short
prio, boolean inserts) {
- return new RequestStarterClient(prioClass, prio, random, this,
inserts ? insertStarter : requestStarter);
- }
-
InetAddress lastIP;
public void redetectAddress() {
@@ -1474,4 +1464,28 @@
public boolean isTestnetEnabled() {
return testnetEnabled;
}
+
+ public ClientKeyBlock fetchKey(ClientKey key) throws KeyVerifyException
{
+ if(key instanceof ClientCHK)
+ return fetch((ClientCHK)key);
+ else if(key instanceof ClientSSK)
+ return fetch((ClientSSK)key);
+ else
+ throw new IllegalStateException("Don't know what to do
with "+key);
+ }
+
+ private ClientKeyBlock fetch(ClientSSK clientSSK) throws
SSKVerifyException {
+ DSAPublicKey key = getKey(clientSSK.pubKeyHash);
+ if(key == null) return null;
+ clientSSK.setPublicKey(key);
+ SSKBlock block = fetch((NodeSSK)clientSSK.getNodeKey());
+ if(block == null) return null;
+ return new ClientSSKBlock(block, clientSSK);
+ }
+
+ private ClientKeyBlock fetch(ClientCHK clientCHK) throws
CHKVerifyException {
+ CHKBlock block = fetch(clientCHK.getNodeCHK());
+ if(block == null) return null;
+ return new ClientCHKBlock(block, clientCHK);
+ }
}
Deleted: trunk/freenet/src/freenet/node/QueuedDataRequest.java
===================================================================
--- trunk/freenet/src/freenet/node/QueuedDataRequest.java 2006-01-25
23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/node/QueuedDataRequest.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -1,28 +0,0 @@
-package freenet.node;
-
-import freenet.keys.ClientKey;
-import freenet.keys.ClientKeyBlock;
-import freenet.keys.KeyBlock;
-
-public class QueuedDataRequest extends QueuedRequest {
-
- private final ClientKey key;
- private final boolean localOnly;
- private final boolean cache;
- private final boolean ignoreStore;
- private QueueingSimpleLowLevelClient client;
-
- public QueuedDataRequest(ClientKey key, boolean localOnly, boolean
cache, QueueingSimpleLowLevelClient client, boolean ignoreStore) {
- this.key = key;
- this.localOnly = localOnly;
- this.client = client;
- this.cache = cache;
- this.ignoreStore = ignoreStore;
- }
-
- public ClientKeyBlock waitAndFetch() throws LowLevelGetException {
- waitForSendClearance();
- return client.realGetKey(key, localOnly, cache, ignoreStore);
- }
-
-}
Deleted: trunk/freenet/src/freenet/node/QueuedInsertRequest.java
===================================================================
--- trunk/freenet/src/freenet/node/QueuedInsertRequest.java 2006-01-25
23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/node/QueuedInsertRequest.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -1,22 +0,0 @@
-package freenet.node;
-
-import freenet.keys.ClientCHKBlock;
-import freenet.keys.ClientKeyBlock;
-
-public class QueuedInsertRequest extends QueuedRequest {
-
- private final ClientKeyBlock block;
- private final boolean cache;
- private QueueingSimpleLowLevelClient client;
-
- public QueuedInsertRequest(ClientKeyBlock block,
QueueingSimpleLowLevelClient client, boolean cache) {
- this.block = block;
- this.client = client;
- this.cache = cache;
- }
-
- public void waitAndPut() throws LowLevelPutException {
- waitForSendClearance();
- client.realPut(block, cache);
- }
-}
Deleted: trunk/freenet/src/freenet/node/QueuedRequest.java
===================================================================
--- trunk/freenet/src/freenet/node/QueuedRequest.java 2006-01-25 23:39:26 UTC
(rev 7929)
+++ trunk/freenet/src/freenet/node/QueuedRequest.java 2006-01-25 23:47:20 UTC
(rev 7930)
@@ -1,32 +0,0 @@
-package freenet.node;
-
-/**
- * A request (including both DataRequest's and InsertRequest's) which can be
queued
- * by a RequestStarter.
- */
-public abstract class QueuedRequest {
-
- private boolean clearToSend = false;
-
- /**
- * Shell for sending the request.
- */
- public final void clearToSend() {
- synchronized(this) {
- clearToSend = true;
- notifyAll();
- }
- }
-
- protected void waitForSendClearance() {
- synchronized(this) {
- while(!clearToSend) {
- try {
- wait(10*1000);
- } catch (InterruptedException e) {
- // Ignore
- }
- }
- }
- }
-}
Deleted: trunk/freenet/src/freenet/node/QueueingSimpleLowLevelClient.java
===================================================================
--- trunk/freenet/src/freenet/node/QueueingSimpleLowLevelClient.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/node/QueueingSimpleLowLevelClient.java
2006-01-25 23:47:20 UTC (rev 7930)
@@ -1,14 +0,0 @@
-package freenet.node;
-
-import freenet.keys.ClientKey;
-import freenet.keys.ClientKeyBlock;
-
-interface QueueingSimpleLowLevelClient extends SimpleLowLevelClient {
-
- /** Unqueued version. Only call from QueuedDataRequest ! */
- ClientKeyBlock realGetKey(ClientKey key, boolean localOnly, boolean
cache, boolean ignoreStore) throws LowLevelGetException;
-
- /** Ditto */
- void realPut(ClientKeyBlock block, boolean cache) throws
LowLevelPutException;
-
-}
Modified: trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java
===================================================================
--- trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java
2006-01-25 23:47:20 UTC (rev 7930)
@@ -73,10 +73,6 @@
b.peers.connect(a.exportFieldSet());
}
- RequestStarterClient[] starters = new
RequestStarterClient[NUMBER_OF_NODES];
- for(int i=0;i<starters.length;i++)
- starters[i] =
nodes[i].makeStarterClient(RequestStarter.INTERACTIVE_PRIORITY_CLASS, (short)0,
false); // pretend are all requests
-
Logger.normal(RealNodeRoutingTest.class, "Added random links");
SwapRequestInterval sri =
@@ -182,7 +178,7 @@
Logger.error(RealNodeRequestInsertTest.class, "Decoded: "+new
String(newBlock.memoryDecode()));
Logger.error(RealNodeRequestInsertTest.class,"CHK:
"+chk.getURI());
Logger.error(RealNodeRequestInsertTest.class,"Headers:
"+HexUtil.bytesToHex(block.getHeaders()));
- randomNode.putKey(block, starters[node1], true);
+ randomNode.realPut(block, true);
Logger.error(RealNodeRequestInsertTest.class, "Inserted to
"+node1);
Logger.error(RealNodeRequestInsertTest.class, "Data:
"+Fields.hashCode(encData)+", Headers: "+Fields.hashCode(encHeaders));
// Pick random node to request from
@@ -191,7 +187,7 @@
node2 = random.nextInt(NUMBER_OF_NODES);
} while(node2 == node1);
Node fetchNode = nodes[node2];
- block = (ClientCHKBlock) fetchNode.getKey((ClientKey) chk,
false, starters[node2], true, false);
+ block = (ClientCHKBlock) fetchNode.realGetKey((ClientKey) chk,
false, true, false);
if(block == null) {
Logger.error(RealNodeRequestInsertTest.class, "Fetch
FAILED from "+node2);
requestsAvg.report(0.0);
Modified: trunk/freenet/src/freenet/node/RequestStarter.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestStarter.java 2006-01-25 23:39:26 UTC
(rev 7929)
+++ trunk/freenet/src/freenet/node/RequestStarter.java 2006-01-25 23:47:20 UTC
(rev 7930)
@@ -3,6 +3,9 @@
import java.util.LinkedList;
import java.util.Vector;
+import freenet.client.async.ClientRequest;
+import freenet.client.async.RequestScheduler;
+import freenet.client.async.SendableRequest;
import freenet.support.Logger;
import freenet.support.UpdatableSortedLinkedList;
import freenet.support.UpdatableSortedLinkedListKilledException;
@@ -33,155 +36,94 @@
/** Anything less important than prefetch (redundant??) */
public static final short MINIMUM_PRIORITY_CLASS = 6;
- // Clients registered
- final Vector clientsByPriority;
+ public static final short NUMBER_OF_PRIORITY_CLASSES =
MINIMUM_PRIORITY_CLASS - MAXIMUM_PRIORITY_CLASS;
+
final RequestThrottle throttle;
- /*
- * Clients which are ready.
- * How do we do round-robin?
- * Have a list of clients which are ready to go, in priority order, and
- * haven't gone this cycle.
- * Have a list of clients which are ready to go next cycle, in priority
- * order.
- * Have each client track the cycle number in which it was last sent.
- */
- final UpdatableSortedLinkedList clientsReadyThisCycle;
- final UpdatableSortedLinkedList clientsReadyNextCycle;
- /** Increment every time we go through the whole list */
- long cycleNumber;
+ RequestScheduler sched;
+ final Node node;
+ private long sentRequestTime;
- public RequestStarter(RequestThrottle throttle, String name) {
- clientsByPriority = new Vector();
- clientsReadyThisCycle = new UpdatableSortedLinkedList();
- clientsReadyNextCycle = new UpdatableSortedLinkedList();
- cycleNumber = 0;
+ public RequestStarter(Node node, RequestThrottle throttle, String name)
{
+ this.node = node;
this.throttle = throttle;
this.name = name;
+ }
+
+ void setScheduler(RequestScheduler sched) {
+ this.sched = sched;
+ }
+
+ void start() {
Thread t = new Thread(this, name);
t.setDaemon(true);
t.start();
}
-
+
final String name;
public String toString() {
return name;
}
- public synchronized void registerClient(RequestStarterClient client) {
- int p = client.priority;
- LinkedList prio = makePriority(p);
- prio.add(client);
- }
-
- public synchronized void notifyReady(RequestStarterClient client) {
- Logger.minor(this, "notifyReady("+client+")");
- try {
- if(client.getCycleLastSent() == cycleNumber) {
- clientsReadyNextCycle.addOrUpdate(client);
- } else {
- // Can send immediately
- clientsReadyThisCycle.addOrUpdate(client);
+ void realRun() {
+ SendableRequest req = sched.removeFirst();
+ if(req != null) {
+ // Create a thread to handle starting the request, and
the resulting feedback
+ Thread t = new Thread(new SenderThread(req));
+ t.setDaemon(true);
+ t.start();
+ sentRequestTime = System.currentTimeMillis();
+ // Wait
+ long delay = throttle.getDelay();
+ Logger.minor(this, "Delay="+delay+" from "+throttle);
+ long sleepUntil = sentRequestTime + delay;
+ long now;
+ do {
+ now = System.currentTimeMillis();
+ if(now < sleepUntil)
+ try {
+ Thread.sleep(sleepUntil - now);
+ Logger.minor(this, "Slept:
"+(sleepUntil-now)+"ms");
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ } while(now < sleepUntil);
+ } else {
+ synchronized(this) {
+ // Always take the lock on RequestStarter first.
+ req = sched.removeFirst();
+ if(req != null) return;
+ try {
+ wait(1000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
}
- } catch (UpdatableSortedLinkedListKilledException e) {
- throw new Error(e);
}
- notifyAll();
}
- private synchronized LinkedList makePriority(int p) {
- while(p >= clientsByPriority.size()) {
- clientsByPriority.add(new LinkedList());
- }
- return (LinkedList) clientsByPriority.get(p);
- }
-
public void run() {
- long sentRequestTime = System.currentTimeMillis();
while(true) {
- RequestStarterClient client;
- client = getNextClient();
- Logger.minor(this, "getNextClient() = "+client);
- if(client != null) {
- boolean success;
- try {
- success = client.send(cycleNumber);
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t);
- continue;
- }
- if(success) {
- sentRequestTime =
System.currentTimeMillis();
- Logger.minor(this, "Sent");
- if(client.isReady()) {
- synchronized(this) {
- try {
-
clientsReadyNextCycle.addOrUpdate(client);
- } catch
(UpdatableSortedLinkedListKilledException e) {
- // Impossible
- throw new
Error(e);
- }
- }
- }
- }
+ try {
+ realRun();
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t, t);
}
- while(true) {
- long delay = throttle.getDelay();
- long sleepUntil = sentRequestTime + delay;
- long now = System.currentTimeMillis();
- if(sleepUntil < now) {
- if(waitingClients()) break;
- // Otherwise wait for notification
- try {
- synchronized(this) {
- wait(1000);
- }
- } catch (InterruptedException e) {
- // Ignore
- }
- } else {
- Logger.minor(this,
"delay="+delay+"("+throttle+") sleep for "+(sleepUntil-now)+" for "+this);
- if(sleepUntil - now > 0)
- try {
- synchronized(this) {
- // At most
sleep 500ms, then recompute.
-
wait(Math.min(sleepUntil - now, 500));
- }
- } catch (InterruptedException
e) {
- // Ignore
- }
- }
- }
}
}
+
+ private class SenderThread implements Runnable {
- private synchronized boolean waitingClients() {
- return !(clientsReadyThisCycle.isEmpty() &&
clientsReadyNextCycle.isEmpty());
- }
+ private final SendableRequest req;
+
+ public SenderThread(SendableRequest req) {
+ this.req = req;
+ }
- /**
- * Get the next ready client.
- */
- private synchronized RequestStarterClient getNextClient() {
- try {
- while(true) {
- if(clientsReadyThisCycle.isEmpty() &&
clientsReadyNextCycle.isEmpty())
- return null;
- if(clientsReadyThisCycle.isEmpty()) {
- cycleNumber++;
-
clientsReadyNextCycle.moveTo(clientsReadyThisCycle);
- }
- RequestStarterClient c = (RequestStarterClient)
clientsReadyThisCycle.removeLowest();
- if(c.getCycleLastSent() == cycleNumber) {
- clientsReadyNextCycle.add(c);
- continue;
- } else {
- c.setCycleLastSet(cycleNumber);
- return c;
- }
- }
- } catch (UpdatableSortedLinkedListKilledException e) {
- throw new Error(e);
+ public void run() {
+ req.send(node);
}
+
}
+
}
Deleted: trunk/freenet/src/freenet/node/RequestStarterClient.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestStarterClient.java 2006-01-25
23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/node/RequestStarterClient.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -1,118 +0,0 @@
-package freenet.node;
-
-import java.util.Vector;
-
-import freenet.crypt.RandomSource;
-import freenet.keys.ClientCHKBlock;
-import freenet.keys.ClientKey;
-import freenet.keys.ClientKeyBlock;
-import freenet.keys.ClientSSKBlock;
-import freenet.keys.KeyBlock;
-import freenet.support.DoublyLinkedList;
-import freenet.support.UpdatableSortedLinkedListItemImpl;
-
-/**
- * Interface to clients for starting a request.
- * Also represents a single client for fairness purposes.
- */
-public class RequestStarterClient extends UpdatableSortedLinkedListItemImpl {
-
- final int priority;
- private int random;
- private long cycleLastSent;
- private final Vector requests;
- private final RandomSource rs;
- private final QueueingSimpleLowLevelClient client;
- private final RequestStarter starter;
-
- public RequestStarterClient(short prioClass, short prio, RandomSource
r, QueueingSimpleLowLevelClient c, RequestStarter starter) {
- this((prioClass << 16) + prio, r, c, starter);
- }
-
- private RequestStarterClient(int prio, RandomSource r,
QueueingSimpleLowLevelClient c, RequestStarter starter) {
- priority = prio;
- this.random = r.nextInt();
- this.starter = starter;
- this.cycleLastSent = -1;
- this.requests = new Vector();
- this.rs = r;
- this.client = c;
- starter.registerClient(this);
- }
-
- /**
- * Blocking fetch of a key.
- * @throws LowLevelGetException If the fetch failed for some reason.
- */
- public ClientKeyBlock getKey(ClientKey key, boolean localOnly, boolean
cache, boolean ignoreStore) throws LowLevelGetException {
- QueuedDataRequest qdr = new QueuedDataRequest(key, localOnly,
cache, client, ignoreStore);
- addRequest(qdr);
- return qdr.waitAndFetch();
- }
-
- /**
- * Blocking insert of a key.
- * @throws LowLevelPutException If the fetch failed for some reason.
- */
- public void putKey(ClientKeyBlock block, boolean cache) throws
LowLevelPutException {
- QueuedInsertRequest qir = new QueuedInsertRequest(block,
client, cache);
- addRequest(qir);
- qir.waitAndPut();
- }
-
- void addRequest(QueuedRequest qr) {
- synchronized(this) {
- requests.add(qr);
- }
- if(starter != null)
- starter.notifyReady(this);
- }
-
- public long getCycleLastSent() {
- return cycleLastSent;
- }
-
- private DoublyLinkedList parentList;
-
- public DoublyLinkedList getParent() {
- return parentList;
- }
-
- public DoublyLinkedList setParent(DoublyLinkedList l) {
- DoublyLinkedList oldList = parentList;
- parentList = l;
- return oldList;
- }
-
- public int compareTo(Object o) {
- if(this == o) return 0;
- RequestStarterClient c = (RequestStarterClient) o;
- if(priority > c.priority) return 1;
- if(priority < c.priority) return -1;
- if(random > c.random) return 1;
- if(random < c.random) return -1;
- return 0;
- }
-
- public synchronized boolean isReady() {
- return !requests.isEmpty();
- }
-
- public boolean send(long cycleNumber) {
- QueuedRequest qr;
- synchronized(this) {
- if(!requests.isEmpty()) {
- int x = rs.nextInt(requests.size());
- qr = (QueuedRequest) requests.remove(x);
- } else qr = null;
- }
- if(qr == null) return false;
- qr.clearToSend();
- return true;
- }
-
- public void setCycleLastSet(long cycleNumber) {
- this.cycleLastSent = cycleNumber;
- }
-
-}
Deleted: trunk/freenet/src/freenet/node/SimpleLowLevelClient.java
===================================================================
--- trunk/freenet/src/freenet/node/SimpleLowLevelClient.java 2006-01-25
23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/node/SimpleLowLevelClient.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -1,31 +0,0 @@
-package freenet.node;
-
-import freenet.keys.ClientCHKBlock;
-import freenet.keys.ClientKey;
-import freenet.keys.ClientKeyBlock;
-import freenet.keys.KeyBlock;
-
-/**
- * @author amphibian
- *
- * Simple client interface... fetch and push single CHKs. No
- * splitfile decoding, no DBRs, no SSKs, for now.
- *
- * We can build higher layers on top of this.
- */
-public interface SimpleLowLevelClient {
-
- /**
- * Fetch a key. Throws if it cannot fetch it.
- * @param cache If false, don't cache the data. See the comments at the top
- * of Node.java.
- */
- public ClientKeyBlock getKey(ClientKey key, boolean localOnly,
RequestStarterClient client, boolean cache, boolean ignoreStore) throws
LowLevelGetException;
-
- /**
- * Insert a key.
- * @param cache If false, don't cache the data. See the comments at the top
- * of Node.java.
- */
- public void putKey(ClientKeyBlock key, RequestStarterClient sctx, boolean
cache) throws LowLevelPutException;
-}
Modified: trunk/freenet/src/freenet/node/TextModeClientInterface.java
===================================================================
--- trunk/freenet/src/freenet/node/TextModeClientInterface.java 2006-01-25
23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/node/TextModeClientInterface.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -1,10 +1,8 @@
package freenet.node;
import java.io.BufferedReader;
-import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
-import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
@@ -51,19 +49,15 @@
final Node n;
final HighLevelSimpleClient client;
final Hashtable streams;
- final RequestStarterClient requestStarterClient;
- final RequestStarterClient insertStarterClient;
final File downloadsDir;
TextModeClientInterface(Node n) {
this.n = n;
- client = n.makeClient(RequestStarter.INTERACTIVE_PRIORITY_CLASS,
(short)0);
+ client = n.makeClient(RequestStarter.INTERACTIVE_PRIORITY_CLASS);
client.addGlobalHook(new EventDumper());
this.r = n.random;
streams = new Hashtable();
new Thread(this, "Text mode client interface").start();
- this.requestStarterClient =
n.makeStarterClient(RequestStarter.INTERACTIVE_PRIORITY_CLASS, (short)0, false);
- this.insertStarterClient =
n.makeStarterClient(RequestStarter.INTERACTIVE_PRIORITY_CLASS, (short)0, true);
this.downloadsDir = n.downloadDir;
}
@@ -361,6 +355,8 @@
// Guess MIME type
String mimeType = DefaultMIMETypes.guessMIMEType(line);
System.out.println("Using MIME type: "+mimeType);
+ if(mimeType.equals(DefaultMIMETypes.DEFAULT_MIME_TYPE))
+ mimeType = ""; // don't need to override it
FileBucket fb = new FileBucket(f, true, false, false);
InsertBlock block = new InsertBlock(fb, new
ClientMetadata(mimeType), FreenetURI.EMPTY_CHK_URI);
@@ -505,6 +501,8 @@
HashMap ret = new HashMap();
File filelist[] = thisdir.listFiles();
+ if(filelist == null)
+ throw new IllegalArgumentException("No such directory");
for(int i = 0 ; i < filelist.length ; i++) {
if (filelist[i].isFile() && filelist[i].canRead()) {
File f = filelist[i];
@@ -525,102 +523,6 @@
return ret;
}
- private String dirPutToList(HashMap dir, String basedir) {
- String ret = "";
- for(Iterator i = dir.keySet().iterator();i.hasNext();) {
- String key = (String) i.next();
- Object o = dir.get(key);
- Metadata target;
- if(o instanceof String) {
- // File
- ret += basedir + key + "\n";
- } else if(o instanceof HashMap) {
- ret += dirPutToList((HashMap)o, basedir + key +
"//");
- } else throw new IllegalArgumentException("Not String
nor HashMap: "+o);
- }
- return ret;
- }
-
- private HashMap dirPut(String directory, boolean getCHKOnly) {
- if (!directory.endsWith("/"))
- directory = directory + "/";
- File thisdir = new File(directory);
-
- System.out.println("Listing dir: "+thisdir);
-
- HashMap ret = new HashMap();
-
- File filelist[] = thisdir.listFiles();
- for(int i = 0 ; i < filelist.length ; i++)
- if (filelist[i].isFile()) {
- FreenetURI uri = null;
- File f = filelist[i];
- String line = f.getAbsolutePath();
- // To ease cleanup, the following code is taken from
above
- // Except for the uri-declaration above.
- // Somelines is also commented out
-
//////////////////////////////////////////////////////////////////////////////////////
- System.out.println("Attempting to read file "+line);
- long startTime = System.currentTimeMillis();
- try {
- if(!(f.exists() && f.canRead())) {
- throw new FileNotFoundException();
- }
-
- // Guess MIME type
- String mimeType = DefaultMIMETypes.guessMIMEType(line);
- System.out.println("Using MIME type: "+mimeType);
-
- FileBucket fb = new FileBucket(f, true, false, false);
- InsertBlock block = new InsertBlock(fb, new
ClientMetadata(mimeType), FreenetURI.EMPTY_CHK_URI);
-
- startTime = System.currentTimeMillis();
- // Declaration is moved out!!!!!!!!!!!!
- uri = client.insert(block, getCHKOnly);
-
- // FIXME depends on CHK's still being renamable
- //uri = uri.setDocName(f.getName());
-
- System.out.println("URI: "+uri);
- long endTime = System.currentTimeMillis();
- long sz = f.length();
- double rate = 1000.0 * sz / (endTime-startTime);
- System.out.println("Upload rate: "+rate+" bytes / second");
- } catch (FileNotFoundException e1) {
- System.out.println("File not found");
- } catch (InserterException e) {
- System.out.println("Finished insert but:
"+e.getMessage());
- if(e.uri != null) {
- System.out.println("URI would have been:
"+e.uri);
- long endTime = System.currentTimeMillis();
- long sz = f.length();
- double rate = 1000.0 * sz / (endTime-startTime);
- System.out.println("Upload rate: "+rate+" bytes /
second");
- }
- if(e.errorCodes != null) {
- System.out.println("Splitfile errors
breakdown:");
-
System.out.println(e.errorCodes.toVerboseString());
- }
- } catch (Throwable t) {
- System.out.println("Insert threw: "+t);
- t.printStackTrace();
- }
-
//////////////////////////////////////////////////////////////////////////////////////
-
- if (uri != null)
- ret.put(filelist[i].getName(), uri.toString(false));
- else
- System.err.println("Could not insert file.");
- //ret.put(filelist[i].getName(), null);
- } else {
- HashMap subdir = dirPut(filelist[i].getAbsolutePath(),
getCHKOnly);
- ret.put(filelist[i].getName(), subdir);
- }
-
- return ret;
- }
-
-
/**
* @return A block of text, input from stdin, ending with a
* . on a line by itself. Does some mangling for a fieldset if
Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-01-25 23:39:26 UTC (rev
7929)
+++ trunk/freenet/src/freenet/node/Version.java 2006-01-25 23:47:20 UTC (rev
7930)
@@ -20,10 +20,10 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- public static final int buildNumber = 380;
+ public static final int buildNumber = 381;
/** Oldest build of Fred we will talk to */
- public static final int lastGoodBuild = 380;
+ public static final int lastGoodBuild = 381;
/** The highest reported build of fred */
public static int highestSeenBuild = buildNumber;
Modified: trunk/freenet/src/freenet/node/fcp/ClientGet.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientGet.java 2006-01-25 23:39:26 UTC
(rev 7929)
+++ trunk/freenet/src/freenet/node/fcp/ClientGet.java 2006-01-25 23:47:20 UTC
(rev 7930)
@@ -2,8 +2,12 @@
import freenet.client.FetchException;
import freenet.client.FetchResult;
-import freenet.client.Fetcher;
import freenet.client.FetcherContext;
+import freenet.client.InserterException;
+import freenet.client.async.BaseClientPutter;
+import freenet.client.async.ClientCallback;
+import freenet.client.async.ClientGetter;
+import freenet.client.async.ClientPutter;
import freenet.keys.FreenetURI;
import freenet.support.Logger;
@@ -11,17 +15,20 @@
* A simple client fetch. This can of course fetch arbitrarily large
* files, including splitfiles, redirects, etc.
*/
-public class ClientGet extends ClientRequest implements Runnable {
+public class ClientGet extends ClientRequest implements ClientCallback {
private final FreenetURI uri;
private final FetcherContext fctx;
- private final Fetcher f;
private final String identifier;
private final int verbosity;
private final FCPConnectionHandler handler;
+ private final ClientGetter getter;
+ private final short priorityClass;
public ClientGet(FCPConnectionHandler handler, ClientGetMessage
message) {
uri = message.uri;
+ // FIXME
+ this.priorityClass = 0;
// Create a Fetcher directly in order to get more fine-grained
control,
// since the client may override a few context elements.
this.handler = handler;
@@ -39,32 +46,43 @@
throw new IllegalStateException("Unknown return type:
"+message.returnType);
fctx.maxOutputLength = message.maxSize;
fctx.maxTempLength = message.maxTempSize;
- f = new Fetcher(uri, fctx);
- Thread t = new Thread(this, "FCP fetcher for "+uri+"
("+identifier+") on "+handler);
- t.setDaemon(true);
- t.start();
+ getter = new ClientGetter(this, handler.node.fetchScheduler,
uri, fctx, priorityClass);
+ try {
+ getter.start();
+ } catch (FetchException e) {
+ onFailure(e, null);
+ }
}
public void cancel() {
fctx.cancel();
}
- public void run() {
- try {
- FetchResult fr = f.run();
- // Success!!!
- FCPMessage msg = new DataFoundMessage(handler, fr,
identifier);
- handler.outputHandler.queue(msg);
- // Send all the data at once
- // FIXME there should be other options
- msg = new AllDataMessage(handler, fr.asBucket(),
identifier);
- handler.outputHandler.queue(msg);
- } catch (FetchException e) {
- // Error
- Logger.minor(this, "Caught "+e, e);
- FCPMessage msg = new GetFailedMessage(handler, e,
identifier);
- handler.outputHandler.queue(msg);
- }
+ public void onSuccess(FetchResult result, ClientGetter state) {
+ FCPMessage msg = new DataFoundMessage(handler, result,
identifier);
+ handler.outputHandler.queue(msg);
+ // Send all the data at once
+ // FIXME there should be other options
+ msg = new AllDataMessage(handler, result.asBucket(),
identifier);
+ handler.outputHandler.queue(msg);
}
+ public void onFailure(FetchException e, ClientGetter state) {
+ Logger.minor(this, "Caught "+e, e);
+ FCPMessage msg = new GetFailedMessage(handler, e, identifier);
+ handler.outputHandler.queue(msg);
+ }
+
+ public void onSuccess(BaseClientPutter state) {
+ // Ignore
+ }
+
+ public void onFailure(InserterException e, BaseClientPutter state) {
+ // Ignore
+ }
+
+ public void onGeneratedURI(FreenetURI uri, BaseClientPutter state) {
+ // Ignore
+ }
+
}
Modified: trunk/freenet/src/freenet/node/fcp/ClientPut.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientPut.java 2006-01-25 23:39:26 UTC
(rev 7929)
+++ trunk/freenet/src/freenet/node/fcp/ClientPut.java 2006-01-25 23:47:20 UTC
(rev 7930)
@@ -1,51 +1,72 @@
package freenet.node.fcp;
import freenet.client.ClientMetadata;
-import freenet.client.FileInserter;
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
import freenet.client.InsertBlock;
import freenet.client.InserterContext;
import freenet.client.InserterException;
+import freenet.client.async.BaseClientPutter;
+import freenet.client.async.ClientCallback;
+import freenet.client.async.ClientGetter;
+import freenet.client.async.ClientPutter;
import freenet.keys.FreenetURI;
-public class ClientPut extends ClientRequest implements Runnable {
+public class ClientPut extends ClientRequest implements ClientCallback {
final FreenetURI uri;
- final FileInserter inserter;
+ final ClientPutter inserter;
final InserterContext ctx;
final InsertBlock block;
final FCPConnectionHandler handler;
final String identifier;
final boolean getCHKOnly;
+ final short priorityClass;
public ClientPut(FCPConnectionHandler handler, ClientPutMessage
message) {
this.handler = handler;
this.identifier = message.identifier;
this.getCHKOnly = message.getCHKOnly;
+ this.priorityClass = 0;
ctx = new InserterContext(handler.defaultInsertContext);
+ ctx.maxInsertRetries = message.maxRetries;
// Now go through the fields one at a time
uri = message.uri;
String mimeType = message.contentType;
block = new InsertBlock(message.bucket, new
ClientMetadata(mimeType), uri);
- inserter = new FileInserter(ctx);
- ctx.maxInsertRetries = message.maxRetries;
- Thread t = new Thread(this, "FCP inserter for "+uri+"
("+identifier+") on "+handler);
- t.setDaemon(true);
- t.start();
+ inserter = new ClientPutter(this, message.bucket, uri, new
ClientMetadata(mimeType), ctx, handler.node.putScheduler, priorityClass,
getCHKOnly, false);
+ try {
+ inserter.start();
+ } catch (InserterException e) {
+ onFailure(e, null);
+ }
}
public void cancel() {
- ctx.cancel();
+ inserter.cancel();
}
- public void run() {
- try {
- FreenetURI uri = inserter.run(block, false, getCHKOnly,
false, null);
- FCPMessage msg = new PutSuccessfulMessage(identifier,
uri);
- handler.outputHandler.queue(msg);
- } catch (InserterException e) {
- FCPMessage msg = new PutFailedMessage(e, identifier);
- handler.outputHandler.queue(msg);
- }
+ public void onSuccess(BaseClientPutter state) {
+ FCPMessage msg = new PutSuccessfulMessage(identifier,
state.getURI());
+ handler.outputHandler.queue(msg);
}
+ public void onFailure(InserterException e, BaseClientPutter state) {
+ FCPMessage msg = new PutFailedMessage(e, identifier);
+ handler.outputHandler.queue(msg);
+ }
+
+ public void onGeneratedURI(FreenetURI uri, BaseClientPutter state) {
+ FCPMessage msg = new URIGeneratedMessage(uri, identifier);
+ handler.outputHandler.queue(msg);
+ }
+
+ public void onSuccess(FetchResult result, ClientGetter state) {
+ // ignore
+ }
+
+ public void onFailure(FetchException e, ClientGetter state) {
+ // ignore
+ }
+
}
Modified: trunk/freenet/src/freenet/node/fcp/FCPConnectionHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/FCPConnectionHandler.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/node/fcp/FCPConnectionHandler.java
2006-01-25 23:47:20 UTC (rev 7930)
@@ -34,7 +34,7 @@
isClosed = false;
this.bf = node.tempBucketFactory;
requestsByIdentifier = new HashMap();
- HighLevelSimpleClient client =
node.makeClient((short)0,(short)0);
+ HighLevelSimpleClient client = node.makeClient((short)0);
defaultFetchContext = client.getFetcherContext();
defaultInsertContext = client.getInserterContext();
inputHandler.start();
Modified: trunk/freenet/src/freenet/node/fcp/PutFailedMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/PutFailedMessage.java 2006-01-25
23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/node/fcp/PutFailedMessage.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -14,6 +14,7 @@
final FailureCodeTracker tracker;
final FreenetURI expectedURI;
final String identifier;
+ final boolean isFatal;
public PutFailedMessage(InserterException e, String identifier) {
this.code = e.getMode();
@@ -22,6 +23,7 @@
this.tracker = e.errorCodes;
this.expectedURI = e.uri;
this.identifier = identifier;
+ this.isFatal = e.isFatal();
}
public SimpleFieldSet getFieldSet() {
@@ -34,6 +36,7 @@
if(tracker != null) {
tracker.copyToFieldSet(fs, "Errors.");
}
+ fs.put("Fatal", Boolean.toString(isFatal));
if(expectedURI != null)
fs.put("ExpectedURI", expectedURI.toString());
return fs;
Copied: trunk/freenet/src/freenet/node/fcp/URIGeneratedMessage.java (from rev
7929, branches/async-client/src/freenet/node/fcp/URIGeneratedMessage.java)
Copied: trunk/freenet/src/freenet/support/IntNumberedItem.java (from rev 7929,
branches/async-client/src/freenet/support/IntNumberedItem.java)
Copied: trunk/freenet/src/freenet/support/NumberedItemComparator.java (from rev
7929, branches/async-client/src/freenet/support/NumberedItemComparator.java)
Modified: trunk/freenet/src/freenet/support/NumberedRecentItems.java
===================================================================
--- trunk/freenet/src/freenet/support/NumberedRecentItems.java 2006-01-25
23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/support/NumberedRecentItems.java 2006-01-25
23:47:20 UTC (rev 7930)
@@ -28,56 +28,7 @@
items = new NumberedItem[maxSize];
count = 0;
wrapAround = wrap;
- myComparator =
- new Comparator() {
-
- public int compare(Object o1, Object o2) {
- int x = ocompare(o1, o2);
- Logger.minor(this, "compare("+o1+","+o2+") = "+x);
- return x;
- }
-
- public int ocompare(Object o1, Object o2) {
- // Nulls at the end of the list
- if(o1 == null && o2 == null)
- return 0; // null == null
- if(o1 != null && o2 == null)
- return 1; // anything > null
- if(o2 != null && o1 == null)
- return -1;
- long i1, i2;
- if(o1 instanceof NumberedItem)
- i1 = ((NumberedItem)o1).getNumber();
- else if(o1 instanceof Long)
- i1 = ((Long)o1).longValue();
- else throw new ClassCastException(o1.toString());
- if(o2 instanceof NumberedItem)
- i2 = ((NumberedItem)o2).getNumber();
- else if(o2 instanceof Long)
- i2 = ((Long)o2).longValue();
- else throw new ClassCastException(o2.toString());
- if(i1 == i2) return 0;
- if(!wrapAround) {
- if(i1 > i2) return 1;
- else return -1;
- } else {
- long firstDistance, secondDistance;
- if(i1 > i2) {
- firstDistance = i1 - i2; // smaller => i1 > i2
- secondDistance = i2 + Long.MAX_VALUE - i1; // smaller
=> i2 > i1
- } else {
- secondDistance = i2 - i1; // smaller => i2 > i1
- firstDistance = i1 + Long.MAX_VALUE - i2; // smaller
=> i1 > i2
- }
- if(Math.abs(firstDistance) < Math.abs(secondDistance)) {
- return 1; // i1>i2
- } else //if(Math.abs(secondDistance) <
Math.abs(firstDistance)) {
- return -1; // i2>i1
- // REDFLAG: base must be odd, so we never get ==
- }
- }
-
- };
+ myComparator = new NumberedItemComparator(wrap);
}
public synchronized NumberedItem get(int num) {
Copied: trunk/freenet/src/freenet/support/RandomGrabArray.java (from rev 7929,
branches/async-client/src/freenet/support/RandomGrabArray.java)
Copied: trunk/freenet/src/freenet/support/RandomGrabArrayItem.java (from rev
7929, branches/async-client/src/freenet/support/RandomGrabArrayItem.java)
Copied: trunk/freenet/src/freenet/support/RandomGrabArrayWithInt.java (from rev
7929, branches/async-client/src/freenet/support/RandomGrabArrayWithInt.java)
Copied: trunk/freenet/src/freenet/support/SimpleIntNumberedItemComparator.java
(from rev 7929,
branches/async-client/src/freenet/support/SimpleIntNumberedItemComparator.java)
Copied: trunk/freenet/src/freenet/support/SortedVectorByNumber.java (from rev
7929, branches/async-client/src/freenet/support/SortedVectorByNumber.java)
Modified: trunk/freenet/src/freenet/support/UpdatableSortedLinkedList.java
===================================================================
--- trunk/freenet/src/freenet/support/UpdatableSortedLinkedList.java
2006-01-25 23:39:26 UTC (rev 7929)
+++ trunk/freenet/src/freenet/support/UpdatableSortedLinkedList.java
2006-01-25 23:47:20 UTC (rev 7930)
@@ -2,8 +2,6 @@
import java.util.Enumeration;
-import freenet.node.RequestStarterClient;
-
/**
* @author amphibian
*