Author: toad
Date: 2006-01-21 21:18:26 +0000 (Sat, 21 Jan 2006)
New Revision: 7898
Added:
branches/async-client/src/freenet/client/async/ClientPut.java
branches/async-client/src/freenet/client/async/ClientPutState.java
branches/async-client/src/freenet/client/async/GetCompletionCallback.java
branches/async-client/src/freenet/client/async/MultiPutCompletionCallback.java
branches/async-client/src/freenet/client/async/PutCompletionCallback.java
branches/async-client/src/freenet/client/async/SendableGet.java
branches/async-client/src/freenet/client/async/SendableInsert.java
branches/async-client/src/freenet/client/async/SingleBlockInserter.java
branches/async-client/src/freenet/client/async/SingleFileInserter.java
branches/async-client/src/freenet/client/async/SplitFileInserter.java
branches/async-client/src/freenet/client/async/SplitFileInserterSegment.java
branches/async-client/src/freenet/client/async/SplitPutCompletionCallback.java
branches/async-client/src/freenet/node/fcp/FetchErrorMessage.java
Removed:
branches/async-client/src/freenet/client/async/RequestCompletionCallback.java
Modified:
branches/async-client/src/freenet/client/FECCodec.java
branches/async-client/src/freenet/client/FileInserter.java
branches/async-client/src/freenet/client/HighLevelSimpleClientImpl.java
branches/async-client/src/freenet/client/InsertBlock.java
branches/async-client/src/freenet/client/InserterContext.java
branches/async-client/src/freenet/client/InserterException.java
branches/async-client/src/freenet/client/Metadata.java
branches/async-client/src/freenet/client/StandardOnionFECCodec.java
branches/async-client/src/freenet/client/async/Client.java
branches/async-client/src/freenet/client/async/ClientGet.java
branches/async-client/src/freenet/client/async/ClientGetState.java
branches/async-client/src/freenet/client/async/ClientRequest.java
branches/async-client/src/freenet/client/async/SendableRequest.java
branches/async-client/src/freenet/client/async/SingleFileFetcher.java
branches/async-client/src/freenet/client/async/SplitFileFetcher.java
branches/async-client/src/freenet/client/async/SplitFileFetcherSegment.java
Log:
Async branch: simple inserts 99% done.
Modified: branches/async-client/src/freenet/client/FECCodec.java
===================================================================
--- branches/async-client/src/freenet/client/FECCodec.java 2006-01-21
20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/FECCodec.java 2006-01-21
21:18:26 UTC (rev 7898)
@@ -2,6 +2,7 @@
import java.io.IOException;
+import freenet.support.Bucket;
import freenet.support.BucketFactory;
/**
@@ -67,6 +68,17 @@
public abstract void encode(SplitfileBlock[] dataBlocks,
SplitfileBlock[] checkBlocks, int blockLength, BucketFactory bucketFactory)
throws IOException;
/**
+ * Encode all missing *check* blocks.
+ * Requires that all the data blocks be present.
+ * @param dataBlocks The data blocks.
+ * @param checkBlocks The check blocks.
+ * @param blockLength The block length in bytes.
+ * @param bf The BucketFactory to use to generate buckets.
+ * @throws IOException If there is an error in decoding caused by an
I/O error (usually involving buckets).
+ */
+ public abstract void encode(Bucket[] dataBlocks, Bucket[] checkBlocks,
int blockLength, BucketFactory bucketFactory) throws IOException;
+
+ /**
* How many check blocks?
*/
public abstract int countCheckBlocks();
Modified: branches/async-client/src/freenet/client/FileInserter.java
===================================================================
--- branches/async-client/src/freenet/client/FileInserter.java 2006-01-21
20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/FileInserter.java 2006-01-21
21:18:26 UTC (rev 7898)
@@ -65,16 +65,13 @@
int blockSize;
int maxSourceDataSize;
boolean isSSK = false;
- boolean dontCompress = false;
+ boolean dontCompress = ctx.dontCompress;
long origSize = data.size();
if(type.equals("SSK") || type.equals("KSK")) {
blockSize = SSKBlock.DATA_LENGTH;
isSSK = true;
maxSourceDataSize =
ClientSSKBlock.MAX_DECOMPRESSED_DATA_LENGTH;
- if(origSize > maxSourceDataSize)
- dontCompress = true;
- // If too big to fit in an SSK, don't even try.
} else if(block.desiredURI.getKeyType().equals("CHK")) {
blockSize = CHKBlock.DATA_LENGTH;
maxSourceDataSize =
ClientCHKBlock.MAX_LENGTH_BEFORE_COMPRESSION;
Modified:
branches/async-client/src/freenet/client/HighLevelSimpleClientImpl.java
===================================================================
--- branches/async-client/src/freenet/client/HighLevelSimpleClientImpl.java
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/HighLevelSimpleClientImpl.java
2006-01-21 21:18:26 UTC (rev 7898)
@@ -1,7 +1,6 @@
package freenet.client;
import java.io.IOException;
-import java.net.MalformedURLException;
import java.util.HashMap;
import freenet.client.events.ClientEventListener;
@@ -9,10 +8,8 @@
import freenet.client.events.EventLogger;
import freenet.client.events.SimpleEventProducer;
import freenet.crypt.RandomSource;
-import freenet.keys.ClientCHK;
import freenet.keys.ClientKey;
import freenet.keys.FreenetURI;
-import freenet.keys.InsertableClientSSK;
import freenet.node.RequestStarterClient;
import freenet.node.SimpleLowLevelClient;
import freenet.support.Bucket;
Modified: branches/async-client/src/freenet/client/InsertBlock.java
===================================================================
--- branches/async-client/src/freenet/client/InsertBlock.java 2006-01-21
20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/InsertBlock.java 2006-01-21
21:18:26 UTC (rev 7898)
@@ -9,8 +9,8 @@
public class InsertBlock {
Bucket data;
- final FreenetURI desiredURI;
- final ClientMetadata clientMetadata;
+ public final FreenetURI desiredURI;
+ public final ClientMetadata clientMetadata;
public InsertBlock(Bucket data, ClientMetadata metadata, FreenetURI
desiredURI) {
this.data = data;
@@ -20,7 +20,9 @@
clientMetadata = metadata;
this.desiredURI = desiredURI;
}
+
+ public Bucket getData() {
+ return data;
+ }
-
-
}
Modified: branches/async-client/src/freenet/client/InserterContext.java
===================================================================
--- branches/async-client/src/freenet/client/InserterContext.java
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/InserterContext.java
2006-01-21 21:18:26 UTC (rev 7898)
@@ -10,16 +10,16 @@
public class InserterContext {
final SimpleLowLevelClient client;
- final BucketFactory bf;
+ public final BucketFactory bf;
/** If true, don't try to compress the data */
- final boolean dontCompress;
- final RandomSource random;
- final short splitfileAlgorithm;
+ public final boolean dontCompress;
+ public final RandomSource random;
+ public final short splitfileAlgorithm;
public int maxInsertRetries;
final int maxSplitInsertThreads;
final int consecutiveRNFsCountAsSuccess;
- final int splitfileSegmentDataBlocks;
- final int splitfileSegmentCheckBlocks;
+ public final int splitfileSegmentDataBlocks;
+ public final int splitfileSegmentCheckBlocks;
final ClientEventProducer eventProducer;
final RequestStarterClient starterClient;
/** Interesting tradeoff, see comments at top of Node.java. */
Modified: branches/async-client/src/freenet/client/InserterException.java
===================================================================
--- branches/async-client/src/freenet/client/InserterException.java
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/InserterException.java
2006-01-21 21:18:26 UTC (rev 7898)
@@ -82,6 +82,8 @@
public static final int ROUTE_REALLY_NOT_FOUND = 8;
/** Collided with pre-existing content */
public static final int COLLISION = 9;
+ /** Cancelled by user */
+ public static final int CANCELLED = 10;
public static String getMessage(int mode) {
switch(mode) {
@@ -103,6 +105,8 @@
return "Insert could not leave the node at all";
case COLLISION:
return "Insert collided with different, pre-existing
data at the same key";
+ case CANCELLED:
+ return "Cancelled by user";
default:
return "Unknown error "+mode;
}
Modified: branches/async-client/src/freenet/client/Metadata.java
===================================================================
--- branches/async-client/src/freenet/client/Metadata.java 2006-01-21
20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/Metadata.java 2006-01-21
21:18:26 UTC (rev 7898)
@@ -30,7 +30,7 @@
// document type
byte documentType;
- static final byte SIMPLE_REDIRECT = 0;
+ public static final byte SIMPLE_REDIRECT = 0;
static final byte MULTI_LEVEL_METADATA = 1;
static final byte SIMPLE_MANIFEST = 2;
static final byte ZIP_MANIFEST = 3;
@@ -554,7 +554,7 @@
/**
* Write the data to a byte array.
*/
- byte[] writeToByteArray() {
+ public byte[] writeToByteArray() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
try {
Modified: branches/async-client/src/freenet/client/StandardOnionFECCodec.java
===================================================================
--- branches/async-client/src/freenet/client/StandardOnionFECCodec.java
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/StandardOnionFECCodec.java
2006-01-21 21:18:26 UTC (rev 7898)
@@ -8,7 +8,6 @@
import com.onionnetworks.fec.DefaultFECCodeFactory;
import com.onionnetworks.fec.FECCode;
-import com.onionnetworks.fec.PureCode;
import com.onionnetworks.util.Buffer;
import freenet.support.Bucket;
@@ -24,7 +23,7 @@
public class Encoder implements Runnable {
- private final SplitfileBlock[] dataBlockStatus,
checkBlockStatus;
+ private final Bucket[] dataBlockStatus, checkBlockStatus;
private final int blockLength;
private final BucketFactory bf;
private IOException thrownIOE;
@@ -32,7 +31,7 @@
private Error thrownError;
private boolean finished;
- public Encoder(SplitfileBlock[] dataBlockStatus,
SplitfileBlock[] checkBlockStatus, int blockLength, BucketFactory bf) {
+ public Encoder(Bucket[] dataBlockStatus, Bucket[]
checkBlockStatus, int blockLength, BucketFactory bf) {
this.dataBlockStatus = dataBlockStatus;
this.checkBlockStatus = checkBlockStatus;
this.blockLength = blockLength;
@@ -288,7 +287,7 @@
}
}
- public void encode(SplitfileBlock[] dataBlockStatus, SplitfileBlock[]
checkBlockStatus, int blockLength, BucketFactory bf) throws IOException {
+ public void encode(Bucket[] dataBlockStatus, Bucket[] checkBlockStatus,
int blockLength, BucketFactory bf) throws IOException {
// Encodes count as decodes.
synchronized(runningDecodesSync) {
while(runningDecodes >= PARALLEL_DECODES) {
@@ -324,12 +323,26 @@
}
}
}
+
+ public void encode(SplitfileBlock[] dataBlockStatus, SplitfileBlock[]
checkBlockStatus, int blockLength, BucketFactory bf) throws IOException {
+ Bucket[] dataBlocks = new Bucket[dataBlockStatus.length];
+ Bucket[] checkBlocks = new Bucket[checkBlockStatus.length];
+ for(int i=0;i<dataBlocks.length;i++)
+ dataBlocks[i] = dataBlockStatus[i].getData();
+ for(int i=0;i<checkBlocks.length;i++)
+ checkBlocks[i] = checkBlockStatus[i].getData();
+ encode(dataBlocks, checkBlocks, blockLength, bf);
+ for(int i=0;i<dataBlocks.length;i++)
+ dataBlockStatus[i].setData(dataBlocks[i]);
+ for(int i=0;i<checkBlocks.length;i++)
+ checkBlockStatus[i].setData(checkBlocks[i]);
+ }
/**
* Do the actual encode.
*/
- private void realEncode(SplitfileBlock[] dataBlockStatus,
- SplitfileBlock[] checkBlockStatus, int blockLength,
BucketFactory bf)
+ private void realEncode(Bucket[] dataBlockStatus,
+ Bucket[] checkBlockStatus, int blockLength,
BucketFactory bf)
throws IOException {
// Runtime.getRuntime().gc();
// Runtime.getRuntime().runFinalization();
@@ -366,7 +379,7 @@
STRIPE_SIZE);
for (int i = 0; i < dataBlockStatus.length; i++) {
- buckets[i] = dataBlockStatus[i].getData();
+ buckets[i] = dataBlockStatus[i];
long sz = buckets[i].size();
if (sz < blockLength) {
if (i != dataBlockStatus.length - 1)
@@ -382,7 +395,7 @@
}
for (int i = 0; i < checkBlockStatus.length; i++) {
- buckets[i + k] = checkBlockStatus[i].getData();
+ buckets[i + k] = checkBlockStatus[i];
if (buckets[i + k] == null) {
buckets[i + k] =
bf.makeBucket(blockLength);
writers[i] = buckets[i +
k].getOutputStream();
@@ -455,7 +468,7 @@
Bucket data = buckets[i + k];
if (data == null)
throw new NullPointerException();
- checkBlockStatus[i].setData(data);
+ checkBlockStatus[i] = data;
}
}
Modified: branches/async-client/src/freenet/client/async/Client.java
===================================================================
--- branches/async-client/src/freenet/client/async/Client.java 2006-01-21
20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/Client.java 2006-01-21
21:18:26 UTC (rev 7898)
@@ -2,6 +2,7 @@
import freenet.client.FetchException;
import freenet.client.FetchResult;
+import freenet.client.InserterException;
/**
* A client process. Something that initiates requests, and can cancel
@@ -14,4 +15,8 @@
public void onFailure(FetchException e, ClientGet state);
+ public void onSuccess(ClientPut state);
+
+ public void onFailure(InserterException e, ClientPut state);
+
}
Modified: branches/async-client/src/freenet/client/async/ClientGet.java
===================================================================
--- branches/async-client/src/freenet/client/async/ClientGet.java
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/ClientGet.java
2006-01-21 21:18:26 UTC (rev 7898)
@@ -12,17 +12,15 @@
/**
* A high level data request.
*/
-public class ClientGet extends ClientRequest implements
RequestCompletionCallback {
+public class ClientGet extends ClientRequest implements GetCompletionCallback {
final Client client;
final FreenetURI uri;
final FetcherContext ctx;
final ArchiveContext actx;
final ClientRequestScheduler scheduler;
- ClientGetState fetchState;
+ ClientGetState currentState;
private boolean finished;
- private boolean cancelled;
- final int priorityClass;
private int archiveRestarts;
public ClientGet(Client client, ClientRequestScheduler sched,
FreenetURI uri, FetcherContext ctx, short priorityClass) {
@@ -33,15 +31,14 @@
this.scheduler = sched;
this.finished = false;
this.actx = new ArchiveContext();
- this.priorityClass = priorityClass;
archiveRestarts = 0;
start();
}
private void start() {
try {
- fetchState = new SingleFileFetcher(this, this, new
ClientMetadata(), uri, ctx, actx, priorityClass, 0, false, null);
- fetchState.schedule();
+ currentState = new SingleFileFetcher(this, this, new
ClientMetadata(), uri, ctx, actx, getPriorityClass(), 0, false, null);
+ currentState.schedule();
} catch (MalformedURLException e) {
onFailure(new
FetchException(FetchException.INVALID_URI, e), null);
} catch (FetchException e) {
@@ -49,16 +46,9 @@
}
}
- public void cancel() {
- cancelled = true;
- }
-
- public boolean isCancelled() {
- return cancelled;
- }
-
public void onSuccess(FetchResult result, ClientGetState state) {
finished = true;
+ currentState = null;
client.onSuccess(result, this);
}
Modified: branches/async-client/src/freenet/client/async/ClientGetState.java
===================================================================
--- branches/async-client/src/freenet/client/async/ClientGetState.java
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/ClientGetState.java
2006-01-21 21:18:26 UTC (rev 7898)
@@ -8,9 +8,6 @@
public abstract ClientGet getParent();
- public void schedule() {
- // TODO Auto-generated method stub
-
- }
-
+ public abstract void schedule();
+
}
Added: branches/async-client/src/freenet/client/async/ClientPut.java
===================================================================
--- branches/async-client/src/freenet/client/async/ClientPut.java
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/ClientPut.java
2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,60 @@
+package freenet.client.async;
+
+import freenet.client.ClientMetadata;
+import freenet.client.InsertBlock;
+import freenet.client.InserterContext;
+import freenet.client.InserterException;
+import freenet.keys.FreenetURI;
+import freenet.support.Bucket;
+
+public class ClientPut extends ClientRequest implements PutCompletionCallback {
+
+ final Client client;
+ final Bucket data;
+ final FreenetURI targetURI;
+ final ClientMetadata cm;
+ final InserterContext ctx;
+ final ClientRequestScheduler scheduler;
+ private ClientPutState currentState;
+ private boolean finished;
+ private boolean cancelled;
+
+ public ClientPut(Client client, Bucket data, FreenetURI targetURI,
ClientMetadata cm, InserterContext ctx,
+ ClientRequestScheduler scheduler, short priorityClass) {
+ super(priorityClass);
+ this.cm = cm;
+ this.client = client;
+ this.data = data;
+ this.targetURI = targetURI;
+ this.ctx = ctx;
+ this.scheduler = scheduler;
+ this.finished = false;
+ this.cancelled = false;
+ try {
+ start();
+ } catch (InserterException e) {
+ onFailure(e, null);
+ }
+ }
+
+ private void start() throws InserterException {
+ currentState =
+ new SingleFileInserter(this, this, new
InsertBlock(data, cm, targetURI), false, ctx, false, false);
+ }
+
+ void setCurrentState(ClientPutState s) {
+ currentState = s;
+ }
+
+ public void onSuccess(ClientPutState state) {
+ finished = true;
+ currentState = null;
+ client.onSuccess(this);
+ }
+
+ public void onFailure(InserterException e, ClientPutState state) {
+ finished = true;
+ currentState = null;
+ client.onFailure(e, this);
+ }
+}
Added: branches/async-client/src/freenet/client/async/ClientPutState.java
===================================================================
--- branches/async-client/src/freenet/client/async/ClientPutState.java
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/ClientPutState.java
2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,12 @@
+package freenet.client.async;
+
+/**
+ * ClientPutState
+ *
+ * Represents a state in the insert process.
+ */
+public interface ClientPutState {
+
+ public abstract ClientPut getParent();
+
+}
Modified: branches/async-client/src/freenet/client/async/ClientRequest.java
===================================================================
--- branches/async-client/src/freenet/client/async/ClientRequest.java
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/ClientRequest.java
2006-01-21 21:18:26 UTC (rev 7898)
@@ -9,6 +9,7 @@
// FIXME move the priority classes from RequestStarter here
private short priorityClass;
+ private boolean cancelled;
public short getPriorityClass() {
return priorityClass;
@@ -18,6 +19,13 @@
this.priorityClass = priorityClass;
}
- public abstract void cancel();
+ public void cancel() {
+ cancelled = true;
+ }
+ public boolean isCancelled() {
+ return cancelled;
+ }
+
+
}
Copied:
branches/async-client/src/freenet/client/async/GetCompletionCallback.java (from
rev 7890,
branches/async-client/src/freenet/client/async/RequestCompletionCallback.java)
===================================================================
---
branches/async-client/src/freenet/client/async/RequestCompletionCallback.java
2006-01-20 19:46:21 UTC (rev 7890)
+++ branches/async-client/src/freenet/client/async/GetCompletionCallback.java
2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,16 @@
+package freenet.client.async;
+
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
+
+/**
+ * Callback called when part of a get request completes - either with a
+ * Bucket full of data, or with an error.
+ */
+public interface GetCompletionCallback {
+
+ public void onSuccess(FetchResult result, ClientGetState state);
+
+ public void onFailure(FetchException e, ClientGetState state);
+
+}
Added:
branches/async-client/src/freenet/client/async/MultiPutCompletionCallback.java
===================================================================
---
branches/async-client/src/freenet/client/async/MultiPutCompletionCallback.java
2006-01-21 20:51:10 UTC (rev 7897)
+++
branches/async-client/src/freenet/client/async/MultiPutCompletionCallback.java
2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,61 @@
+package freenet.client.async;
+
+import java.util.LinkedList;
+
+import freenet.client.InserterException;
+
+public class MultiPutCompletionCallback implements PutCompletionCallback,
ClientPutState {
+
+ private final LinkedList waitingFor;
+ private final PutCompletionCallback cb;
+ private final ClientPut parent;
+ private boolean finished;
+ private boolean started;
+
+ public MultiPutCompletionCallback(PutCompletionCallback cb, ClientPut
parent, boolean dontTellParent) {
+ this.cb = cb;
+ this.waitingFor = new LinkedList();
+ this.parent = parent;
+ finished = false;
+ if(!dontTellParent)
+ parent.setCurrentState(this);
+ }
+
+ public synchronized void onSuccess(ClientPutState state) {
+ if(finished) return;
+ waitingFor.remove(state);
+ if(waitingFor.isEmpty() && started) {
+ complete(null);
+ }
+ }
+
+ public synchronized void onFailure(InserterException e, ClientPutState
state) {
+ if(finished) return;
+ waitingFor.remove(state);
+ if(waitingFor.isEmpty()) {
+ complete(e);
+ }
+ }
+
+ private synchronized void complete(InserterException e) {
+ finished = true;
+ if(e != null)
+ cb.onFailure(e, this);
+ else
+ cb.onSuccess(this);
+ }
+
+ public synchronized void add(ClientPutState ps) {
+ if(finished) return;
+ waitingFor.add(ps);
+ }
+
+ public synchronized void arm() {
+ started = true;
+ }
+
+ public ClientPut getParent() {
+ return parent;
+ }
+
+}
Added: branches/async-client/src/freenet/client/async/PutCompletionCallback.java
===================================================================
--- branches/async-client/src/freenet/client/async/PutCompletionCallback.java
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/PutCompletionCallback.java
2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,17 @@
+package freenet.client.async;
+
+import freenet.client.InserterException;
+import freenet.keys.ClientKey;
+
+/**
+ * Callback called when part of a put request completes.
+ */
+public interface PutCompletionCallback {
+
+ public void onSuccess(ClientPutState state);
+
+ public void onFailure(InserterException e, ClientPutState state);
+
+ public void onEncode(ClientKey key);
+
+}
Deleted:
branches/async-client/src/freenet/client/async/RequestCompletionCallback.java
===================================================================
---
branches/async-client/src/freenet/client/async/RequestCompletionCallback.java
2006-01-21 20:51:10 UTC (rev 7897)
+++
branches/async-client/src/freenet/client/async/RequestCompletionCallback.java
2006-01-21 21:18:26 UTC (rev 7898)
@@ -1,16 +0,0 @@
-package freenet.client.async;
-
-import freenet.client.FetchException;
-import freenet.client.FetchResult;
-
-/**
- * Callback called when part of a get request completes - either with a
- * Bucket full of data, or with an error.
- */
-public interface RequestCompletionCallback {
-
- public void onSuccess(FetchResult result, ClientGetState state);
-
- public void onFailure(FetchException e, ClientGetState state);
-
-}
Added: branches/async-client/src/freenet/client/async/SendableGet.java
===================================================================
--- branches/async-client/src/freenet/client/async/SendableGet.java
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/SendableGet.java
2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,20 @@
+package freenet.client.async;
+
+import freenet.keys.ClientKey;
+import freenet.keys.ClientKeyBlock;
+import freenet.node.LowLevelPutException;
+
+/**
+ * A low-level key fetch which can be sent immediately. @see SendableRequest
+ */
+public interface SendableGet extends SendableRequest {
+
+ public ClientKey getKey();
+
+ /** Called when/if the low-level request succeeds. */
+ public void onSuccess(ClientKeyBlock block);
+
+ /** Called when/if the low-level request fails. */
+ public void onFailure(LowLevelPutException e);
+
+}
Added: branches/async-client/src/freenet/client/async/SendableInsert.java
===================================================================
--- branches/async-client/src/freenet/client/async/SendableInsert.java
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/SendableInsert.java
2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,26 @@
+package freenet.client.async;
+
+import freenet.keys.ClientKeyBlock;
+import freenet.node.LowLevelPutException;
+
+/**
+ * Callback interface for a low level insert, which is immediately sendable.
These
+ * should be registered on the ClientRequestScheduler when we want to send
them. It will
+ * then, when it is time to send, create a thread, send the request, and call
the
+ * callback below.
+ */
+public interface SendableInsert extends SendableRequest {
+
+ /** Get the ClientKeyBlock to insert. This may be created
+ * just-in-time, and may return null; ClientRequestScheduler
+ * will simply unregister the SendableInsert if this happens.
+ */
+ public ClientKeyBlock getBlock();
+
+ /** Called when we successfully insert the data */
+ public void onSuccess();
+
+ /** Called when we don't! */
+ public void onFailure(LowLevelPutException e);
+
+}
Modified: branches/async-client/src/freenet/client/async/SendableRequest.java
===================================================================
--- branches/async-client/src/freenet/client/async/SendableRequest.java
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/SendableRequest.java
2006-01-21 21:18:26 UTC (rev 7898)
@@ -1,25 +1,13 @@
package freenet.client.async;
-import freenet.keys.ClientKey;
-import freenet.keys.ClientKeyBlock;
-import freenet.node.LowLevelPutException;
-
/**
* A low-level request which can be sent immediately. These are registered
* on the ClientRequestScheduler.
*/
public interface SendableRequest {
- public ClientKey getKey();
-
public short getPriorityClass();
public int getRetryCount();
- /** Called when/if the low-level request succeeds. */
- public void onSuccess(ClientKeyBlock block);
-
- /** Called when/if the low-level request fails. */
- public void onFailure(LowLevelPutException e);
-
}
Added: branches/async-client/src/freenet/client/async/SingleBlockInserter.java
===================================================================
--- branches/async-client/src/freenet/client/async/SingleBlockInserter.java
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/SingleBlockInserter.java
2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,186 @@
+package freenet.client.async;
+
+import java.io.IOException;
+import java.lang.ref.SoftReference;
+import java.net.MalformedURLException;
+
+import freenet.client.FailureCodeTracker;
+import freenet.client.InserterContext;
+import freenet.client.InserterException;
+import freenet.keys.CHKEncodeException;
+import freenet.keys.ClientCHKBlock;
+import freenet.keys.ClientKey;
+import freenet.keys.ClientKeyBlock;
+import freenet.keys.FreenetURI;
+import freenet.keys.InsertableClientSSK;
+import freenet.keys.SSKEncodeException;
+import freenet.node.LowLevelPutException;
+import freenet.support.Bucket;
+import freenet.support.Logger;
+
+/**
+ * Insert *ONE KEY*.
+ */
+public class SingleBlockInserter implements SendableInsert, ClientPutState {
+
+ final Bucket sourceData;
+ final short compressionCodec;
+ final FreenetURI uri; // uses essentially no RAM in the common case of
a CHK because we use FreenetURI.EMPTY_CHK_URI
+ FreenetURI resultingURI;
+ final PutCompletionCallback cb;
+ final ClientPut parent;
+ final InserterContext ctx;
+ private int retries;
+ private final FailureCodeTracker errors;
+ private boolean finished;
+ private ClientKey key;
+ private SoftReference refToClientKeyBlock;
+ final int token; // for e.g. splitfiles
+ final boolean isMetadata;
+ final int sourceLength;
+
+ public SingleBlockInserter(ClientPut parent, Bucket data, short
compressionCodec, FreenetURI uri, InserterContext ctx, PutCompletionCallback
cb, boolean isMetadata, int sourceLength, int token) throws InserterException {
+ this.token = token;
+ this.parent = parent;
+ this.retries = 0;
+ this.finished = false;
+ this.ctx = ctx;
+ errors = new FailureCodeTracker(true);
+ this.cb = cb;
+ this.uri = uri;
+ this.compressionCodec = compressionCodec;
+ this.sourceData = data;
+ this.isMetadata = isMetadata;
+ this.sourceLength = sourceLength;
+ }
+
+ protected ClientKeyBlock innerEncode() throws InserterException {
+ String uriType = uri.getKeyType().toUpperCase();
+ if(uriType.equals("CHK")) {
+ try {
+ return ClientCHKBlock.encode(sourceData,
isMetadata, true, compressionCodec, sourceLength);
+ } catch (CHKEncodeException e) {
+ Logger.error(this, "Caught "+e, e);
+ throw new
InserterException(InserterException.INTERNAL_ERROR, e, null);
+ } catch (IOException e) {
+ Logger.error(this, "Caught "+e, e);
+ throw new
InserterException(InserterException.BUCKET_ERROR, e, null);
+ }
+ } else if(uriType.equals("SSK")) {
+ try {
+ InsertableClientSSK ik =
InsertableClientSSK.create(uri);
+ return ik.encode(sourceData, isMetadata, true,
compressionCodec, sourceLength, ctx.random);
+ } catch (MalformedURLException e) {
+ throw new
InserterException(InserterException.INVALID_URI, e, null);
+ } catch (SSKEncodeException e) {
+ Logger.error(this, "Caught "+e, e);
+ throw new
InserterException(InserterException.INTERNAL_ERROR, e, null);
+ } catch (IOException e) {
+ Logger.error(this, "Caught "+e, e);
+ throw new
InserterException(InserterException.BUCKET_ERROR, e, null);
+ }
+ } else {
+ throw new
InserterException(InserterException.INVALID_URI, "Unknown keytype "+uriType,
null);
+ }
+ }
+
+ protected synchronized ClientKeyBlock encode() throws InserterException
{
+ if(refToClientKeyBlock != null) {
+ ClientKeyBlock block = (ClientKeyBlock)
refToClientKeyBlock.get();
+ if(block != null) return block;
+ }
+ ClientKeyBlock block = innerEncode();
+ refToClientKeyBlock =
+ new SoftReference(block);
+ resultingURI = block.getClientKey().getURI();
+ return block;
+ }
+
+ public boolean isInsert() {
+ return true;
+ }
+
+ public short getPriorityClass() {
+ return parent.getPriorityClass();
+ }
+
+ public int getRetryCount() {
+ return retries;
+ }
+
+ public void onFailure(LowLevelPutException e) {
+ if(parent.isCancelled())
+ fail(new
InserterException(InserterException.CANCELLED));
+ if(e.code == LowLevelPutException.COLLISION)
+ fail(new
InserterException(InserterException.COLLISION));
+ switch(e.code) {
+ case LowLevelPutException.INTERNAL_ERROR:
+ errors.inc(InserterException.INTERNAL_ERROR);
+ break;
+ case LowLevelPutException.REJECTED_OVERLOAD:
+ errors.inc(InserterException.REJECTED_OVERLOAD);
+ break;
+ case LowLevelPutException.ROUTE_NOT_FOUND:
+ errors.inc(InserterException.ROUTE_NOT_FOUND);
+ break;
+ case LowLevelPutException.ROUTE_REALLY_NOT_FOUND:
+ errors.inc(InserterException.ROUTE_REALLY_NOT_FOUND);
+ break;
+ default:
+ Logger.error(this, "Unknown LowLevelPutException code:
"+e.code);
+ errors.inc(InserterException.INTERNAL_ERROR);
+ }
+ if(retries > ctx.maxInsertRetries) {
+ if(errors.isOneCodeOnly())
+ fail(new
InserterException(errors.getFirstCode()));
+ else
+ fail(new
InserterException(InserterException.TOO_MANY_RETRIES_IN_BLOCKS, errors,
getURI()));
+ }
+ retries++;
+ parent.scheduler.register(this);
+ }
+
+ private synchronized void fail(InserterException e) {
+ if(finished) return;
+ finished = true;
+ cb.onFailure(e, this);
+ }
+
+ public ClientKeyBlock getBlock() {
+ try {
+ return encode();
+ } catch (InserterException e) {
+ cb.onFailure(e, this);
+ return null;
+ } catch (Throwable t) {
+ cb.onFailure(new
InserterException(InserterException.INTERNAL_ERROR, t, null), this);
+ return null;
+ }
+ }
+
+ public void schedule() {
+ if(finished) {
+ Logger.normal(this, "Asking to schedule but already
finished");
+ return;
+ }
+ parent.scheduler.register(this);
+ }
+
+ public FreenetURI getURI() {
+ if(resultingURI == null)
+ getBlock();
+ return resultingURI;
+ }
+
+ public void onSuccess() {
+ synchronized(this) {
+ finished = true;
+ }
+ cb.onSuccess(this);
+ }
+
+ public ClientPut getParent() {
+ return parent;
+ }
+
+}
Modified: branches/async-client/src/freenet/client/async/SingleFileFetcher.java
===================================================================
--- branches/async-client/src/freenet/client/async/SingleFileFetcher.java
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/SingleFileFetcher.java
2006-01-21 21:18:26 UTC (rev 7898)
@@ -25,14 +25,14 @@
import freenet.support.compress.CompressionOutputSizeException;
import freenet.support.compress.Compressor;
-public class SingleFileFetcher extends ClientGetState implements
SendableRequest {
+public class SingleFileFetcher extends ClientGetState implements SendableGet {
final ClientGet parent;
//final FreenetURI uri;
final ClientKey key;
final LinkedList metaStrings;
final FetcherContext ctx;
- final RequestCompletionCallback rcb;
+ final GetCompletionCallback rcb;
final ClientMetadata clientMetadata;
private Metadata metadata;
final int maxRetries;
@@ -45,6 +45,7 @@
private int retryCount;
private final LinkedList decompressors;
private final boolean dontTellClientGet;
+ private boolean cancelled;
private Object token;
@@ -53,7 +54,8 @@
* @param token
* @param dontTellClientGet
*/
- public SingleFileFetcher(ClientGet get, RequestCompletionCallback cb,
ClientMetadata metadata, ClientKey key, LinkedList metaStrings, FetcherContext
ctx, ArchiveContext actx, int maxRetries, int recursionLevel, boolean
dontTellClientGet, Object token) throws FetchException {
+ public SingleFileFetcher(ClientGet get, GetCompletionCallback cb,
ClientMetadata metadata, ClientKey key, LinkedList metaStrings, FetcherContext
ctx, ArchiveContext actx, int maxRetries, int recursionLevel, boolean
dontTellClientGet, Object token) throws FetchException {
+ this.cancelled = false;
this.dontTellClientGet = dontTellClientGet;
this.token = token;
this.parent = get;
@@ -76,12 +78,12 @@
}
/** Called by ClientGet. */
- public SingleFileFetcher(ClientGet get, RequestCompletionCallback cb,
ClientMetadata metadata, FreenetURI uri, FetcherContext ctx, ArchiveContext
actx, int maxRetries, int recursionLevel, boolean dontTellClientGet, Object
token) throws MalformedURLException, FetchException {
+ public SingleFileFetcher(ClientGet get, GetCompletionCallback cb,
ClientMetadata metadata, FreenetURI uri, FetcherContext ctx, ArchiveContext
actx, int maxRetries, int recursionLevel, boolean dontTellClientGet, Object
token) throws MalformedURLException, FetchException {
this(get, cb, metadata, ClientKey.getBaseKey(uri),
uri.listMetaStrings(), ctx, actx, maxRetries, recursionLevel,
dontTellClientGet, token);
}
/** Copy constructor, modifies a few given fields, don't call
schedule() */
- public SingleFileFetcher(SingleFileFetcher fetcher, Metadata newMeta,
RequestCompletionCallback callback, FetcherContext ctx2) throws FetchException {
+ public SingleFileFetcher(SingleFileFetcher fetcher, Metadata newMeta,
GetCompletionCallback callback, FetcherContext ctx2) throws FetchException {
this.token = fetcher.token;
this.dontTellClientGet = fetcher.dontTellClientGet;
this.actx = fetcher.actx;
@@ -104,7 +106,7 @@
public void schedule() {
if(!dontTellClientGet)
- this.parent.fetchState = this;
+ this.parent.currentState = this;
parent.scheduler.register(this);
}
@@ -350,10 +352,10 @@
// And will also discover that the data is available, and will
complete.
}
- class ArchiveFetcherCallback implements RequestCompletionCallback {
+ class ArchiveFetcherCallback implements GetCompletionCallback {
public void onSuccess(FetchResult result, ClientGetState state)
{
- parent.fetchState = SingleFileFetcher.this;
+ parent.currentState = SingleFileFetcher.this;
try {
ctx.archiveManager.extractToCache(thisKey,
ah.getArchiveType(), result.asBucket(), actx, ah);
} catch (ArchiveFailureException e) {
@@ -381,10 +383,10 @@
}
- class MultiLevelMetadataCallback implements RequestCompletionCallback {
+ class MultiLevelMetadataCallback implements GetCompletionCallback {
public void onSuccess(FetchResult result, ClientGetState state)
{
- parent.fetchState = SingleFileFetcher.this;
+ parent.currentState = SingleFileFetcher.this;
try {
metadata =
Metadata.construct(result.asBucket());
} catch (MetadataParseException e) {
@@ -410,6 +412,10 @@
// Real onFailure
private void onFailure(FetchException e, boolean forceFatal) {
+ if(parent.isCancelled() || cancelled) {
+ onFailure(new FetchException(FetchException.CANCELLED));
+ return;
+ }
if(!(e.isFatal() || forceFatal) ) {
if(retryCount <= maxRetries) {
if(parent.isCancelled()) {
@@ -453,5 +459,9 @@
public Object getToken() {
return token;
}
-
+
+ public void cancel() {
+ cancelled = true;
+ }
+
}
Added: branches/async-client/src/freenet/client/async/SingleFileInserter.java
===================================================================
--- branches/async-client/src/freenet/client/async/SingleFileInserter.java
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/SingleFileInserter.java
2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,257 @@
+package freenet.client.async;
+
+import java.io.IOException;
+
+import freenet.client.InsertBlock;
+import freenet.client.InserterContext;
+import freenet.client.InserterException;
+import freenet.client.Metadata;
+import freenet.keys.CHKBlock;
+import freenet.keys.ClientCHKBlock;
+import freenet.keys.ClientKey;
+import freenet.keys.FreenetURI;
+import freenet.keys.SSKBlock;
+import freenet.support.Bucket;
+import freenet.support.BucketTools;
+import freenet.support.compress.CompressionOutputSizeException;
+import freenet.support.compress.Compressor;
+
+/**
+ * Attempt to insert a file. May include metadata.
+ *
+ * This stage:
+ * Attempt to compress the file. Off-thread if it will take a while.
+ * Then hand it off to SimpleFileInserter.
+ */
+class SingleFileInserter implements ClientPutState {
+
+ // Config option???
+ private static final long COMPRESS_OFF_THREAD_LIMIT = 65536;
+
+ final ClientPut parent;
+ final InsertBlock block;
+ final InserterContext ctx;
+ final boolean metadata;
+ final PutCompletionCallback cb;
+ final boolean getCHKOnly;
+ /** If true, we are not the top level request, and should not
+ * update our parent to point to us as current put-stage. */
+ final boolean dontTellParent;
+
+ SingleFileInserter(ClientPut parent, PutCompletionCallback cb,
InsertBlock block, boolean metadata, InserterContext ctx, boolean dontCompress,
boolean dontTellParent, boolean getCHKOnly) throws InserterException {
+ this.parent = parent;
+ this.block = block;
+ this.ctx = ctx;
+ this.metadata = metadata;
+ this.cb = cb;
+ this.dontTellParent = dontTellParent;
+ this.getCHKOnly = getCHKOnly;
+ if(!dontTellParent)
+ parent.setCurrentState(this);
+ start();
+ }
+
+ private void start() throws InserterException {
+ if((!ctx.dontCompress) && block.getData().size() >
COMPRESS_OFF_THREAD_LIMIT) {
+ // Run off thread
+ OffThreadCompressor otc = new OffThreadCompressor();
+ Thread t = new Thread(otc, "Compressor for "+this);
+ t.setDaemon(true);
+ t.start();
+ } else {
+ tryCompress();
+ }
+ }
+
+ private class OffThreadCompressor implements Runnable {
+ public void run() {
+ try {
+ tryCompress();
+ } catch (InserterException e) {
+ cb.onFailure(e, SingleFileInserter.this);
+ }
+ }
+ }
+
+ private void tryCompress() throws InserterException {
+ // First, determine how small it needs to be
+ Bucket origData = block.getData();
+ Bucket data = origData;
+ int blockSize;
+ boolean dontCompress = ctx.dontCompress;
+
+ long origSize = data.size();
+ String type = block.desiredURI.getKeyType().toUpperCase();
+ if(type.equals("SSK") || type.equals("KSK")) {
+ blockSize = SSKBlock.DATA_LENGTH;
+ } else if(type.equals("CHK")) {
+ blockSize = CHKBlock.DATA_LENGTH;
+ } else {
+ throw new
InserterException(InserterException.INVALID_URI);
+ }
+
+ Compressor bestCodec = null;
+ Bucket bestCompressedData = null;
+
+ if(origSize > blockSize && (!ctx.dontCompress) &&
(!dontCompress)) {
+ // Try to compress the data.
+ // Try each algorithm, starting with the fastest and
weakest.
+ // Stop when run out of algorithms, or the compressed
data fits in a single block.
+ int algos = Compressor.countCompressAlgorithms();
+ try {
+ for(int i=0;i<algos;i++) {
+ Compressor comp =
Compressor.getCompressionAlgorithmByDifficulty(i);
+ Bucket result;
+ result = comp.compress(origData,
ctx.bf, Long.MAX_VALUE);
+ if(result.size() < blockSize) {
+ bestCodec = comp;
+ data = result;
+ if(bestCompressedData != null)
+
ctx.bf.freeBucket(bestCompressedData);
+ bestCompressedData = data;
+ break;
+ }
+ if(bestCompressedData != null &&
result.size() < bestCompressedData.size()) {
+
ctx.bf.freeBucket(bestCompressedData);
+ bestCompressedData = result;
+ data = result;
+ bestCodec = comp;
+ } else if(bestCompressedData == null &&
result.size() < data.size()) {
+ bestCompressedData = result;
+ bestCodec = comp;
+ data = result;
+ }
+ }
+ } catch (IOException e) {
+ throw new
InserterException(InserterException.BUCKET_ERROR, e, null);
+ } catch (CompressionOutputSizeException e) {
+ // Impossible
+ throw new Error(e);
+ }
+ }
+
+ // Compressed data
+
+ // Insert it...
+ short codecNumber = bestCodec == null ? -1 :
bestCodec.codecNumberForMetadata();
+
+ if(block.getData().size() > Integer.MAX_VALUE)
+ throw new
InserterException(InserterException.INTERNAL_ERROR, "2GB+ should not encode to
one block!", null);
+
+ if((block.clientMetadata == null ||
block.clientMetadata.isTrivial())) {
+ if(data.size() < blockSize) {
+ // Just insert it
+ SingleBlockInserter bi = new
SingleBlockInserter(parent, data, codecNumber, block.desiredURI, ctx, cb,
metadata, (int)block.getData().size(), -1);
+ bi.schedule();
+ return;
+ }
+ }
+ if (data.size() < ClientCHKBlock.MAX_COMPRESSED_DATA_LENGTH) {
+ MultiPutCompletionCallback mcb =
+ new MultiPutCompletionCallback(cb, parent,
dontTellParent);
+ // Insert single block, then insert pointer to it
+ SingleBlockInserter dataPutter = new
SingleBlockInserter(parent, data, codecNumber, FreenetURI.EMPTY_CHK_URI, ctx,
mcb, metadata, (int)origSize, -1);
+ Metadata meta = new Metadata(Metadata.SIMPLE_REDIRECT,
dataPutter.getURI(), block.clientMetadata);
+ Bucket metadataBucket;
+ try {
+ metadataBucket =
BucketTools.makeImmutableBucket(ctx.bf, meta.writeToByteArray());
+ } catch (IOException e) {
+ throw new
InserterException(InserterException.BUCKET_ERROR, e, null);
+ }
+ SingleBlockInserter metaPutter = new
SingleBlockInserter(parent, metadataBucket, (short) -1, block.desiredURI, ctx,
mcb, true, (int)origSize, -1);
+ mcb.add(metaPutter);
+ mcb.add(dataPutter);
+ mcb.arm();
+ dataPutter.schedule();
+ metaPutter.schedule();
+ return;
+ }
+ // Otherwise the file is too big to fit into one block
+ // We therefore must make a splitfile
+ // Job of SplitHandler: when the splitinserter has the metadata,
+ // insert it. Then when the splitinserter has finished, and the
+ // metadata insert has finished too, tell the master callback.
+ SplitHandler sh = new SplitHandler();
+ SplitFileInserter sfi = new SplitFileInserter(parent, sh, data,
bestCodec, block.clientMetadata, ctx, sh, getCHKOnly, metadata, true);
+ sh.sfi = sfi;
+ if(!dontTellParent)
+ parent.setCurrentState(sh);
+ sfi.start();
+ return;
+ }
+
+ /**
+ * When we get the metadata, start inserting it to our target key.
+ * When we have inserted both the metadata and the splitfile,
+ * call the master callback.
+ */
+ class SplitHandler implements SplitPutCompletionCallback,
ClientPutState {
+
+ SplitFileInserter sfi;
+ SingleFileInserter metadataPutter;
+ boolean finished = false;
+ boolean splitInsertSuccess = false;
+ boolean metaInsertSuccess = false;
+
+ public synchronized void onSuccess(ClientPutState state) {
+ if(finished) return;
+ if(state == sfi)
+ splitInsertSuccess = true;
+ else if(state == metadataPutter)
+ metaInsertSuccess = true;
+ if(splitInsertSuccess && metaInsertSuccess)
+ cb.onSuccess(this);
+ }
+
+ public synchronized void onFailure(InserterException e,
ClientPutState state) {
+ if(finished) return;
+ fail(e);
+ }
+
+ public void onGeneratedMetadata(Metadata meta) {
+ if(finished) return;
+ synchronized(this) {
+ Bucket metadataBucket;
+ try {
+ metadataBucket =
BucketTools.makeImmutableBucket(ctx.bf, meta.writeToByteArray());
+ } catch (IOException e) {
+ InserterException ex = new
InserterException(InserterException.BUCKET_ERROR, e, null);
+ fail(ex);
+ return;
+ }
+ InsertBlock newBlock = new
InsertBlock(metadataBucket, null, block.desiredURI);
+ try {
+ metadataPutter = new
SingleFileInserter(parent, this, newBlock, true, ctx, false, getCHKOnly, true);
+ } catch (InserterException e) {
+ cb.onFailure(e, this);
+ return;
+ }
+ }
+ try {
+ metadataPutter.start();
+ } catch (InserterException e) {
+ fail(e);
+ return;
+ }
+ }
+
+ private synchronized void fail(InserterException e) {
+ if(finished) return;
+ finished = true;
+ cb.onFailure(e, this);
+ }
+
+ public ClientPut getParent() {
+ return parent;
+ }
+
+ public void onEncode(ClientKey key) {
+ // Ignore
+ }
+
+ }
+
+ public ClientPut getParent() {
+ return parent;
+ }
+}
Modified: branches/async-client/src/freenet/client/async/SplitFileFetcher.java
===================================================================
--- branches/async-client/src/freenet/client/async/SplitFileFetcher.java
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/SplitFileFetcher.java
2006-01-21 21:18:26 UTC (rev 7898)
@@ -21,7 +21,7 @@
import freenet.support.compress.Compressor;
/**
- * Fetch a splitfile, decompress it if need be, and return it to the
RequestCompletionCallback.
+ * Fetch a splitfile, decompress it if need be, and return it to the
GetCompletionCallback.
* Most of the work is done by the segments, and we do not need a thread.
*/
public class SplitFileFetcher extends ClientGetState {
@@ -31,7 +31,7 @@
final LinkedList decompressors;
final ClientMetadata clientMetadata;
final ClientGet parent;
- final RequestCompletionCallback cb;
+ final GetCompletionCallback cb;
final int recursionLevel;
/** The splitfile type. See the SPLITFILE_ constants on Metadata. */
final short splitfileType;
@@ -57,7 +57,7 @@
private final boolean splitUseLengths;
private boolean finished;
- public SplitFileFetcher(Metadata metadata, RequestCompletionCallback
rcb, ClientGet parent,
+ public SplitFileFetcher(Metadata metadata, GetCompletionCallback rcb,
ClientGet parent,
FetcherContext newCtx, LinkedList decompressors,
ClientMetadata clientMetadata,
ArchiveContext actx, int recursionLevel) throws
FetchException, MetadataParseException {
this.finished = false;
@@ -235,4 +235,10 @@
fetchContext.eventProducer.produceEvent(new
SplitfileProgressEvent(totalBlocks, fetchedBlocks, failedBlocks,
fatallyFailedBlocks, runningBlocks));
}
+ public void schedule() {
+ for(int i=0;i<segments.length;i++) {
+ segments[i].schedule();
+ }
+ }
+
}
Modified:
branches/async-client/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- branches/async-client/src/freenet/client/async/SplitFileFetcherSegment.java
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/SplitFileFetcherSegment.java
2006-01-21 21:18:26 UTC (rev 7898)
@@ -22,7 +22,7 @@
* A single segment within a SplitFileFetcher.
* This in turn controls a large number of SingleFileFetcher's.
*/
-public class SplitFileFetcherSegment implements RequestCompletionCallback {
+public class SplitFileFetcherSegment implements GetCompletionCallback {
final short splitfileType;
final FreenetURI[] dataBlocks;
@@ -83,22 +83,6 @@
blockFetchContext = new FetcherContext(fetcherContext,
FetcherContext.SPLITFILE_DEFAULT_BLOCK_MASK);
this.recursionLevel = 0;
}
-
- try {
- for(int i=0;i<dataBlocks.length;i++) {
- dataBlockStatus[i] =
- new
SingleFileFetcher(parentFetcher.parent, this, null, dataBlocks[i],
blockFetchContext, archiveContext, fetchContext.maxSplitfileBlockRetries,
recursionLevel, true, new Integer(i));
- dataBlockStatus[i].schedule();
- }
- for(int i=0;i<checkBlocks.length;i++) {
- checkBlockStatus[i] =
- new
SingleFileFetcher(parentFetcher.parent, this, null, checkBlocks[i],
blockFetchContext, archiveContext, fetchContext.maxSplitfileBlockRetries,
recursionLevel, true, new Integer(dataBlocks.length+i));
- checkBlockStatus[i].schedule();
- }
- } catch (MalformedURLException e) {
- // Invalidates the whole splitfile
- throw new FetchException(FetchException.INVALID_URI,
"Invalid URI in splitfile");
- }
}
public boolean isFinished() {
@@ -293,6 +277,48 @@
errors.merge(e.errorCodes);
else
errors.inc(new Integer(e.mode),
((SingleFileFetcher)state).getRetryCount());
+ if(failedBlocks + fatallyFailedBlocks > (dataBlocks.length +
checkBlocks.length - minFetched)) {
+ fail(new FetchException(FetchException.SPLITFILE_ERROR,
errors));
+ }
}
+ private void fail(FetchException e) {
+ synchronized(this) {
+ if(finished) return;
+ finished = true;
+ this.failureException = e;
+ }
+ for(int i=0;i<dataBlockStatus.length;i++) {
+ SingleFileFetcher f = dataBlockStatus[i];
+ if(f != null)
+ f.cancel();
+ }
+ for(int i=0;i<checkBlockStatus.length;i++) {
+ SingleFileFetcher f = dataBlockStatus[i];
+ if(f != null)
+ f.cancel();
+ }
+ parentFetcher.segmentFinished(this);
+ }
+
+ public void schedule() {
+ try {
+ for(int i=0;i<dataBlocks.length;i++) {
+ dataBlockStatus[i] =
+ new
SingleFileFetcher(parentFetcher.parent, this, null, dataBlocks[i],
blockFetchContext, archiveContext, blockFetchContext.maxSplitfileBlockRetries,
recursionLevel, true, new Integer(i));
+ dataBlockStatus[i].schedule();
+ }
+ for(int i=0;i<checkBlocks.length;i++) {
+ checkBlockStatus[i] =
+ new
SingleFileFetcher(parentFetcher.parent, this, null, checkBlocks[i],
blockFetchContext, archiveContext, blockFetchContext.maxSplitfileBlockRetries,
recursionLevel, true, new Integer(dataBlocks.length+i));
+ checkBlockStatus[i].schedule();
+ }
+ } catch (MalformedURLException e) {
+ // Invalidates the whole splitfile
+ fail(new FetchException(FetchException.INVALID_URI,
"Invalid URI in splitfile"));
+ } catch (Throwable t) {
+ fail(new FetchException(FetchException.INVALID_URI, t));
+ }
+ }
+
}
Added: branches/async-client/src/freenet/client/async/SplitFileInserter.java
===================================================================
--- branches/async-client/src/freenet/client/async/SplitFileInserter.java
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/SplitFileInserter.java
2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,214 @@
+package freenet.client.async;
+
+import java.io.IOException;
+import java.util.Vector;
+
+import freenet.client.ClientMetadata;
+import freenet.client.FECCodec;
+import freenet.client.InserterContext;
+import freenet.client.InserterException;
+import freenet.client.Metadata;
+import freenet.client.async.SingleFileInserter.SplitHandler;
+import freenet.keys.ClientCHKBlock;
+import freenet.keys.FreenetURI;
+import freenet.support.Bucket;
+import freenet.support.BucketTools;
+import freenet.support.Logger;
+import freenet.support.compress.Compressor;
+
+public class SplitFileInserter implements ClientPutState {
+
+ final ClientPut parent;
+ final InserterContext ctx;
+ final SplitPutCompletionCallback cb;
+ final long dataLength;
+ final short compressionCodec;
+ final short splitfileAlgorithm;
+ final int segmentSize;
+ final int checkSegmentSize;
+ final SplitFileInserterSegment[] segments;
+ final boolean getCHKOnly;
+ final int countCheckBlocks;
+ final int countDataBlocks;
+ private boolean haveSentMetadata;
+ final ClientMetadata cm;
+ final boolean isMetadata;
+ private boolean finished;
+
+ public SplitFileInserter(ClientPut put, SplitPutCompletionCallback cb,
Bucket data, Compressor bestCodec, ClientMetadata clientMetadata,
InserterContext ctx, SplitHandler sh, boolean getCHKOnly, boolean isMetadata,
boolean dontTellParent) throws InserterException {
+ this.parent = put;
+ if(!dontTellParent)
+ parent.setCurrentState(this);
+ this.finished = false;
+ this.isMetadata = isMetadata;
+ this.cm = clientMetadata;
+ this.getCHKOnly = getCHKOnly;
+ this.cb = cb;
+ this.ctx = ctx;
+ Bucket[] dataBuckets;
+ try {
+ dataBuckets = BucketTools.split(data,
ClientCHKBlock.DATA_LENGTH, ctx.bf);
+ } catch (IOException e) {
+ throw new
InserterException(InserterException.BUCKET_ERROR, e, null);
+ }
+ countDataBlocks = dataBuckets.length;
+ // Encoding is done by segments
+ if(bestCodec == null)
+ compressionCodec = -1;
+ else
+ compressionCodec = bestCodec.codecNumberForMetadata();
+ this.splitfileAlgorithm = ctx.splitfileAlgorithm;
+ this.dataLength = data.size();
+ segmentSize = ctx.splitfileSegmentDataBlocks;
+ checkSegmentSize = splitfileAlgorithm ==
Metadata.SPLITFILE_NONREDUNDANT ? 0 : ctx.splitfileSegmentCheckBlocks;
+
+ // Create segments
+ segments = splitIntoSegments(segmentSize, dataBuckets);
+ int count = 0;
+ for(int i=0;i<segments.length;i++)
+ count += segments[i].countCheckBlocks();
+ countCheckBlocks = count;
+ }
+
+ /**
+ * Group the blocks into segments.
+ */
+ private SplitFileInserterSegment[] splitIntoSegments(int segmentSize,
Bucket[] origDataBlocks) {
+ int dataBlocks = origDataBlocks.length;
+
+ Vector segs = new Vector();
+
+ FECCodec codec = FECCodec.getCodec(splitfileAlgorithm,
origDataBlocks.length);
+
+ // First split the data up
+ if(dataBlocks < segmentSize || segmentSize == -1) {
+ // Single segment
+ SplitFileInserterSegment onlySeg = new
SplitFileInserterSegment(this, codec, origDataBlocks, ctx, getCHKOnly, 0);
+ segs.add(onlySeg);
+ } else {
+ int j = 0;
+ int segNo = 0;
+ for(int i=segmentSize;;i+=segmentSize) {
+ if(i > dataBlocks) i = dataBlocks;
+ Bucket[] seg = new Bucket[i-j];
+ System.arraycopy(origDataBlocks, j, seg, 0,
i-j);
+ j = i;
+ for(int x=0;x<seg.length;x++)
+ if(seg[x] == null) throw new
NullPointerException("In splitIntoSegs: "+x+" is null of "+seg.length+" of
"+segNo);
+ SplitFileInserterSegment s = new
SplitFileInserterSegment(this, codec, seg, ctx, getCHKOnly, segNo);
+ segs.add(s);
+
+ if(i == dataBlocks) break;
+ segNo++;
+ }
+ }
+ return (SplitFileInserterSegment[]) segs.toArray(new
SplitFileInserterSegment[segs.size()]);
+ }
+
+ public void start() {
+ for(int i=0;i<segments.length;i++)
+ segments[i].start();
+ }
+
+ public void encodedSegment(SplitFileInserterSegment segment) {
+ Logger.minor(this, "Encoded segment "+segment.segNo+" of
"+this);
+ }
+
+ public void segmentHasURIs(SplitFileInserterSegment segment) {
+ if(haveSentMetadata) {
+ Logger.error(this, "WTF? Already sent metadata");
+ return;
+ }
+
+ boolean allHaveURIs = true;
+ synchronized(this) {
+ for(int i=0;i<segments.length;i++) {
+ if(!segments[i].isEncoded())
+ allHaveURIs = false;
+ }
+ }
+
+ if(allHaveURIs) {
+ boolean missingURIs;
+ Metadata m = null;
+ synchronized(this) {
+ // Create metadata
+ FreenetURI[] dataURIs = getDataURIs();
+ FreenetURI[] checkURIs = getCheckURIs();
+
+ Logger.minor(this, "Data URIs:
"+dataURIs.length+", check URIs: "+checkURIs.length);
+
+ missingURIs = anyNulls(dataURIs) ||
anyNulls(checkURIs);
+
+ if(!missingURIs) {
+ // Create Metadata
+ Metadata metadata = new
Metadata(splitfileAlgorithm, dataURIs, checkURIs, segmentSize,
checkSegmentSize, cm, dataLength, compressionCodec, isMetadata);
+ }
+ haveSentMetadata = true;
+ }
+ if(missingURIs) {
+ // Error
+ fail(new
InserterException(InserterException.INTERNAL_ERROR, "Missing URIs after
encoding", null));
+ return;
+ } else
+ cb.onGeneratedMetadata(m);
+ }
+
+ }
+
+ private void fail(InserterException e) {
+ synchronized(this) {
+ if(finished) return;
+ finished = true;
+ }
+ cb.onFailure(e, this);
+ }
+
+ // FIXME move this to somewhere
+ private static boolean anyNulls(Object[] array) {
+ for(int i=0;i<array.length;i++)
+ if(array[i] == null) return true;
+ return false;
+ }
+
+ private FreenetURI[] getCheckURIs() {
+ // Copy check blocks from each segment into a FreenetURI[].
+ FreenetURI[] uris = new FreenetURI[countCheckBlocks];
+ int x = 0;
+ for(int i=0;i<segments.length;i++) {
+ FreenetURI[] segURIs = segments[i].getCheckURIs();
+ if(x + segURIs.length > countCheckBlocks)
+ throw new IllegalStateException("x="+x+",
segURIs="+segURIs.length+", countCheckBlocks="+countCheckBlocks);
+ System.arraycopy(segURIs, 0, uris, x, segURIs.length);
+ x += segURIs.length;
+ }
+
+ if(uris.length != x)
+ throw new IllegalStateException("Total is wrong");
+
+ return uris;
+ }
+
+ private FreenetURI[] getDataURIs() {
+ // Copy check blocks from each segment into a FreenetURI[].
+ FreenetURI[] uris = new FreenetURI[countDataBlocks];
+ int x = 0;
+ for(int i=0;i<segments.length;i++) {
+ FreenetURI[] segURIs = segments[i].getDataURIs();
+ if(x + segURIs.length > countDataBlocks)
+ throw new IllegalStateException("x="+x+",
segURIs="+segURIs.length+", countDataBlocks="+countDataBlocks);
+ System.arraycopy(segURIs, 0, uris, x, segURIs.length);
+ x += segURIs.length;
+ }
+
+ if(uris.length != x)
+ throw new IllegalStateException("Total is wrong");
+
+ return uris;
+ }
+
+ public ClientPut getParent() {
+ return parent;
+ }
+
+}
Added:
branches/async-client/src/freenet/client/async/SplitFileInserterSegment.java
===================================================================
---
branches/async-client/src/freenet/client/async/SplitFileInserterSegment.java
2006-01-21 20:51:10 UTC (rev 7897)
+++
branches/async-client/src/freenet/client/async/SplitFileInserterSegment.java
2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,130 @@
+package freenet.client.async;
+
+import java.io.IOException;
+
+import freenet.client.FECCodec;
+import freenet.client.InserterContext;
+import freenet.client.InserterException;
+import freenet.keys.ClientCHKBlock;
+import freenet.keys.ClientKey;
+import freenet.keys.FreenetURI;
+import freenet.support.Bucket;
+
+public class SplitFileInserterSegment implements PutCompletionCallback {
+
+ final SplitFileInserter parent;
+ final FECCodec splitfileAlgo;
+ final Bucket[] dataBlocks;
+ final Bucket[] checkBlocks;
+ final FreenetURI[] dataURIs;
+ final FreenetURI[] checkURIs;
+ final SingleBlockInserter[] dataBlockInserters;
+ final SingleBlockInserter[] checkBlockInserters;
+ final InserterContext blockInsertContext;
+ final int segNo;
+ private boolean encoded;
+ private boolean finished;
+ private InserterException toThrow;
+
+ public SplitFileInserterSegment(SplitFileInserter parent, FECCodec
splitfileAlgo, Bucket[] origDataBlocks, InserterContext blockInsertContext,
boolean getCHKOnly, int segNo) {
+ this.parent = parent;
+ this.blockInsertContext = blockInsertContext;
+ this.splitfileAlgo = splitfileAlgo;
+ this.dataBlocks = origDataBlocks;
+ int checkBlockCount = splitfileAlgo == null ? 0 :
splitfileAlgo.countCheckBlocks();
+ checkBlocks = new Bucket[checkBlockCount];
+ checkURIs = new FreenetURI[checkBlockCount];
+ dataURIs = new FreenetURI[origDataBlocks.length];
+ dataBlockInserters = new SingleBlockInserter[dataBlocks.length];
+ checkBlockInserters = new
SingleBlockInserter[checkBlocks.length];
+ this.segNo = segNo;
+ }
+
+ public void start() {
+ if(splitfileAlgo == null) {
+ // Don't need to encode blocks
+ } else {
+ // Encode blocks
+ Thread t = new Thread(new EncodeBlocksRunnable(),
"Blocks encoder");
+ t.setDaemon(true);
+ t.start();
+ }
+ }
+
+ private class EncodeBlocksRunnable implements Runnable {
+
+ public void run() {
+ encode();
+ }
+ }
+
+ void encode() {
+ try {
+ splitfileAlgo.encode(dataBlocks, checkBlocks,
ClientCHKBlock.DATA_LENGTH, blockInsertContext.bf);
+ // Success! Start the fetches.
+ encoded = true;
+ parent.encodedSegment(this);
+ // Start the inserts
+ for(int i=0;i<dataBlockInserters.length;i++)
+ dataBlockInserters[i] =
+ new SingleBlockInserter(parent.parent,
dataBlocks[i], (short)-1, FreenetURI.EMPTY_CHK_URI, blockInsertContext, this,
false, ClientCHKBlock.DATA_LENGTH, i);
+ for(int i=0;i<checkBlockInserters.length;i++)
+ checkBlockInserters[i] =
+ new SingleBlockInserter(parent.parent,
checkBlocks[i], (short)-1, FreenetURI.EMPTY_CHK_URI, blockInsertContext, this,
false, ClientCHKBlock.DATA_LENGTH, i + dataBlocks.length);
+ } catch (IOException e) {
+ InserterException ex =
+ new
InserterException(InserterException.BUCKET_ERROR, e, null);
+ finish(ex);
+ } catch (Throwable t) {
+ InserterException ex =
+ new
InserterException(InserterException.INTERNAL_ERROR, t, null);
+ finish(ex);
+ }
+ }
+
+ private void finish(InserterException ex) {
+ synchronized(this) {
+ if(finished) return;
+ finished = true;
+ toThrow = ex;
+ }
+ parent.segmentFinished(this);
+ }
+
+ public void onSuccess(ClientPutState state) {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void onFailure(InserterException e, ClientPutState state) {
+ // TODO Auto-generated method stub
+
+ }
+
+ public boolean isFinished() {
+ return finished;
+ }
+
+ public boolean isEncoded() {
+ return encoded;
+ }
+
+ public int countCheckBlocks() {
+ return checkBlocks.length;
+ }
+
+ public FreenetURI[] getCheckURIs() {
+ return checkURIs;
+ }
+
+ public FreenetURI[] getDataURIs() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void onEncode(ClientKey key) {
+ // TODO Auto-generated method stub
+
+ }
+
+}
Added:
branches/async-client/src/freenet/client/async/SplitPutCompletionCallback.java
===================================================================
---
branches/async-client/src/freenet/client/async/SplitPutCompletionCallback.java
2006-01-21 20:51:10 UTC (rev 7897)
+++
branches/async-client/src/freenet/client/async/SplitPutCompletionCallback.java
2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,9 @@
+package freenet.client.async;
+
+import freenet.client.Metadata;
+
+public interface SplitPutCompletionCallback extends PutCompletionCallback {
+
+ public void onGeneratedMetadata(Metadata meta);
+
+}
Added: branches/async-client/src/freenet/node/fcp/FetchErrorMessage.java
===================================================================
--- branches/async-client/src/freenet/node/fcp/FetchErrorMessage.java
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/node/fcp/FetchErrorMessage.java
2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,45 @@
+package freenet.node.fcp;
+
+import freenet.client.FailureCodeTracker;
+import freenet.client.FetchException;
+import freenet.node.Node;
+import freenet.support.SimpleFieldSet;
+
+public class FetchErrorMessage extends FCPMessage {
+
+ final int code;
+ final String codeDescription;
+ final String extraDescription;
+ final FailureCodeTracker tracker;
+ final boolean isFatal;
+
+ public FetchErrorMessage(FCPConnectionHandler handler, FetchException
e, String identifier) {
+ this.tracker = e.errorCodes;
+ this.code = e.mode;
+ this.codeDescription = FetchException.getMessage(code);
+ this.extraDescription = e.extraMessage;
+ this.isFatal = e.isFatal();
+ }
+
+ public SimpleFieldSet getFieldSet() {
+ SimpleFieldSet sfs = new SimpleFieldSet();
+ sfs.put("Code", Integer.toHexString(code));
+ sfs.put("CodeDescription", codeDescription);
+ if(extraDescription != null)
+ sfs.put("ExtraDescription", extraDescription);
+ sfs.put("Fatal", Boolean.toString(isFatal));
+ if(tracker != null) {
+ tracker.copyToFieldSet(sfs, "Errors.");
+ }
+ return sfs;
+ }
+
+ public String getName() {
+ return "FetchError";
+ }
+
+ public void run(FCPConnectionHandler handler, Node node) throws
MessageInvalidException {
+ throw new
MessageInvalidException(ProtocolErrorMessage.INVALID_MESSAGE, "FetchError goes
from server to client not the other way around");
+ }
+
+}