Author: xor
Date: 2008-11-16 18:06:27 +0000 (Sun, 16 Nov 2008)
New Revision: 23654
Modified:
trunk/plugins/WoT/IdentityInserter.java
trunk/plugins/WoT/introduction/IntroductionClient.java
trunk/plugins/WoT/introduction/IntroductionServer.java
Log:
Reduce locking to fix fetching & inserting. There must have been a deadlock.
Modified: trunk/plugins/WoT/IdentityInserter.java
===================================================================
--- trunk/plugins/WoT/IdentityInserter.java 2008-11-16 18:01:45 UTC (rev
23653)
+++ trunk/plugins/WoT/IdentityInserter.java 2008-11-16 18:06:27 UTC (rev
23654)
@@ -62,7 +62,7 @@
private volatile boolean isRunning;
private Thread mThread;
- private final ArrayList<ClientPutter> mInserts = new
ArrayList<ClientPutter>(10); /* Just assume that there are 10 identities */
+ private final ArrayList<BaseClientPutter> mInserts = new
ArrayList<BaseClientPutter>(10); /* Just assume that there are 10 identities */
/**
* Creates an IdentityInserter.
@@ -96,6 +96,7 @@
while(isRunning) {
Thread.interrupted();
Logger.debug(this, "IdentityInserter loop running...");
+ cancelInserts(); /* FIXME: check whether this does not
prevent the cancelled inserts from being restarted in the loop right now */
ObjectSet<OwnIdentity> identities =
OwnIdentity.getAllOwnIdentities(db);
while(identities.hasNext()) {
OwnIdentity identity = identities.next();
@@ -125,12 +126,14 @@
Logger.debug(this, "Identity inserter thread finished.");
}
- private synchronized void cancelInserts() {
- Iterator<ClientPutter> i = mInserts.iterator();
- int icounter = 0;
- Logger.debug(this, "Trying to stop all inserts");
- while (i.hasNext()) { i.next().cancel(); ++icounter; }
- Logger.debug(this, "Stopped " + icounter + " current inserts");
+ private void cancelInserts() {
+ Logger.debug(this, "Trying to stop all inserts");
+ synchronized(mInserts) {
+ Iterator<BaseClientPutter> i = mInserts.iterator();
+ int icounter = 0;
+ while (i.hasNext()) { i.next().cancel(); ++icounter; }
+ Logger.debug(this, "Stopped " + icounter + " current
inserts");
+ }
}
/**
@@ -190,7 +193,9 @@
/* FIXME: are these parameters correct? */
ClientPutter pu = client.insert(ib, false,
"identity.xml", false, ictx, this);
pu.setPriorityClass(RequestStarter.UPDATE_PRIORITY_CLASS);
- mInserts.add(pu);
+ synchronized(mInserts) {
+ mInserts.add(pu);
+ }
tempB = null;
// We set the date now, so if the identity is modified
during the insert, we'll insert it again next time
@@ -208,9 +213,18 @@
os.close();
}
}
-
+
+ private void removeInsert(BaseClientPutter p) {
+ Logger.debug(this, "Trying to remove insert " + p.getURI());
+ synchronized(mInserts) {
+ p.cancel(); /* FIXME: is this necessary ? */
+ mInserts.remove(p);
+ }
+ Logger.debug(this, "Removed request.");
+ }
+
// Only called by inserts
- public synchronized void onSuccess(BaseClientPutter state)
+ public void onSuccess(BaseClientPutter state)
{
try {
OwnIdentity identity = OwnIdentity.getByURI(db,
state.getURI());
@@ -220,23 +234,21 @@
db.commit();
Logger.debug(this, "Successful insert of identity '" +
identity.getNickName() + "'");
} catch(Exception e) { Logger.error(this, "Error", e); }
- state.cancel(); /* FIXME: is this necessary */
- mInserts.remove(state);
+ removeInsert(state);
}
// Only called by inserts
- public synchronized void onFailure(InsertException e, BaseClientPutter
state)
+ public void onFailure(InsertException e, BaseClientPutter state)
{
try {
OwnIdentity identity = OwnIdentity.getByURI(db,
state.getURI());
Logger.error(this, "Error during insert of identity '"
+ identity.getNickName() + "'", e);
} catch(Exception ex) { Logger.error(this, "Error", e); }
- state.cancel(); /* FIXME: is this necessary */
- mInserts.remove(state);
+ removeInsert(state);
}
- /* Not needed functions from the ClientCallback inteface */
+ /* Not needed functions from the ClientCallback interface */
public void onFailure(FetchException e, ClientGetter state) {
// TODO Auto-generated method stub
Modified: trunk/plugins/WoT/introduction/IntroductionClient.java
===================================================================
--- trunk/plugins/WoT/introduction/IntroductionClient.java 2008-11-16
18:01:45 UTC (rev 23653)
+++ trunk/plugins/WoT/introduction/IntroductionClient.java 2008-11-16
18:06:27 UTC (rev 23654)
@@ -88,7 +88,7 @@
private final ArrayBlockingQueue<Identity> mIdentities = new
ArrayBlockingQueue<Identity>(PUZZLE_POOL_SIZE); /* FIXME: figure out whether my
assumption that this is just the right size is correct */
private final HashSet<ClientGetter> mRequests = new
HashSet<ClientGetter>(PUZZLE_REQUEST_COUNT * 2); /* TODO: profile & tweak */
- private final HashSet<ClientPutter> mInserts = new
HashSet<ClientPutter>(PUZZLE_REQUEST_COUNT * 2);
+ private final HashSet<BaseClientPutter> mInserts = new
HashSet<BaseClientPutter>(PUZZLE_REQUEST_COUNT * 2);
/**
* Creates an IntroductionServer
@@ -201,7 +201,9 @@
/* FIXME: are these parameters correct? */
ClientPutter pu = mClient.insert(ib, false, null,
false, ictx, this);
pu.setPriorityClass(RequestStarter.UPDATE_PRIORITY_CLASS);
- mInserts.add(pu);
+ synchronized(mInserts) {
+ mInserts.add(pu);
+ }
tempB = null;
Logger.debug(this, "Started to insert puzzle solution
of " + solver.getNickName() + " at " + solutionURI);
@@ -218,16 +220,22 @@
}
}
- private synchronized void cancelRequests() {
- Iterator<ClientGetter> r = mRequests.iterator();
- Iterator<ClientPutter> i = mInserts.iterator();
- int rcounter = 0;
- int icounter = 0;
- Logger.debug(this, "Trying to stop all requests & inserts");
- while (r.hasNext()) { r.next().cancel(); ++rcounter; }
- while (i.hasNext()) { i.next().cancel(); ++icounter; }
- Logger.debug(this, "Stopped " + rcounter + " current requests");
- Logger.debug(this, "Stopped " + icounter + " current inserts");
+ private void cancelRequests() {
+ Logger.debug(this, "Trying to stop all requests & inserts");
+
+ synchronized(mRequests) {
+ Iterator<ClientGetter> r = mRequests.iterator();
+ int rcounter = 0;
+ while (r.hasNext()) { r.next().cancel(); ++rcounter; }
+ Logger.debug(this, "Stopped " + rcounter + " current
requests");
+ }
+
+ synchronized(mInserts) {
+ Iterator<BaseClientPutter> i = mInserts.iterator();
+ int icounter = 0;
+ while (i.hasNext()) { i.next().cancel(); ++icounter; }
+ Logger.debug(this, "Stopped " + icounter + " current
inserts");
+ }
}
private synchronized void downloadPuzzles() {
@@ -240,16 +248,18 @@
ArrayList<Identity> ids = new
ArrayList<Identity>(PUZZLE_POOL_SIZE);
int counter = 0;
- for(Identity i : allIds) {
- /* TODO: Create a "boolean providesIntroduction" in
Identity to use a database query instead of this */
-
if(i.hasContext(IntroductionPuzzle.INTRODUCTION_CONTEXT) &&
!mIdentities.contains(i)
- && i.getBestScore(db) >
MINIMUM_SCORE_FOR_PUZZLE_DOWNLOAD) {
- ids.add(i);
- ++counter;
+ synchronized(mIdentities) {
+ for(Identity i : allIds) {
+ /* TODO: Create a "boolean
providesIntroduction" in Identity to use a database query instead of this */
+
if(i.hasContext(IntroductionPuzzle.INTRODUCTION_CONTEXT) &&
!mIdentities.contains(i)
+ && i.getBestScore(db) >
MINIMUM_SCORE_FOR_PUZZLE_DOWNLOAD) {
+ ids.add(i);
+ ++counter;
+ }
+
+ if(counter == PUZZLE_REQUEST_COUNT)
+ break;
}
-
- if(counter == PUZZLE_REQUEST_COUNT)
- break;
}
if(counter == 0) {
@@ -282,7 +292,7 @@
}
- private synchronized void downloadPuzzle(Identity identity, int index)
throws FetchException {
+ private void downloadPuzzle(Identity identity, int index) throws
FetchException {
FreenetURI uri =
IntroductionPuzzle.generateRequestURI(identity, new Date(), index);
FetchContext fetchContext = mClient.getFetchContext();
@@ -290,38 +300,57 @@
fetchContext.maxNonSplitfileRetries = -1; // retry forever
ClientGetter g = mClient.fetch(uri, -1, this, this,
fetchContext);
g.setPriorityClass(RequestStarter.UPDATE_PRIORITY_CLASS); /*
FIXME: decide which one to use */
- mRequests.add(g);
- if(!mIdentities.contains(identity)) {
- mIdentities.poll();
- try {
- mIdentities.put(identity);
- } catch(InterruptedException e) {}
+ synchronized(mRequests) {
+ mRequests.add(g);
}
+ synchronized(mIdentities) {
+ if(!mIdentities.contains(identity)) {
+ mIdentities.poll();
+ try {
+ mIdentities.put(identity);
+ } catch(InterruptedException e) {}
+ }
+ }
Logger.debug(this, "Trying to fetch puzzle from " +
uri.toString());
}
-
+ private void removeRequest(ClientGetter g) {
+ Logger.debug(this, "Trying to remove request " + g.getURI());
+ synchronized(mRequests) {
+ g.cancel();
+ mRequests.remove(g);
+ }
+ Logger.debug(this, "Removed request.");
+ }
+
+ private void removeInsert(BaseClientPutter p) {
+ Logger.debug(this, "Trying to remove insert " + p.getURI());
+ synchronized(mInserts) {
+ p.cancel();
+ mInserts.remove(p);
+ }
+ Logger.debug(this, "Removed request.");
+ }
+
/**
* Called when the node can't fetch a file OR when there is a newer
edition.
* In our case, called when there is no puzzle available.
*/
- public synchronized void onFailure(FetchException e, ClientGetter
state) {
+ public void onFailure(FetchException e, ClientGetter state) {
Logger.normal(this, "Downloading puzzle " + state.getURI() + "
failed.", e);
-
- mRequests.remove(state);
+ removeRequest(state);
}
/**
* Called when a puzzle is successfully fetched.
*/
- public synchronized void onSuccess(FetchResult result, ClientGetter
state) {
+ public void onSuccess(FetchResult result, ClientGetter state) {
Logger.debug(this, "Fetched puzzle: " + state.getURI());
try {
IntroductionPuzzle p =
IntroductionPuzzle.importFromXML(db, result.asBucket().getInputStream(),
state.getURI());
IntroductionPuzzle.deleteOldestPuzzles(db,
PUZZLE_POOL_SIZE);
- state.cancel(); /* FIXME: is this necessary */
- mRequests.remove(state);
+ removeRequest(state);
if(p.getIndex() < MAX_PUZZLES_PER_IDENTITY) {
downloadPuzzle(p.getInserter(), p.getIndex() +
1);
}
@@ -331,22 +360,20 @@
}
// Only called by inserts
- public synchronized void onSuccess(BaseClientPutter state)
+ public void onSuccess(BaseClientPutter state)
{
Logger.debug(this, "Successful insert of puzzle solution at " +
state.getURI());
- state.cancel(); /* FIXME: is this necessary */
- mInserts.remove(state);
+ removeInsert(state);
}
// Only called by inserts
- public synchronized void onFailure(InsertException e, BaseClientPutter
state)
+ public void onFailure(InsertException e, BaseClientPutter state)
{
Logger.debug(this, "Insert of puzzle solution failed for " +
state.getURI(), e);
- state.cancel(); /* FIXME: is this necessary */
- mInserts.remove(state);
+ removeInsert(state);
}
- /* Not needed functions from the ClientCallback inteface */
+ /* Not needed functions from the ClientCallback interface */
// Only called by inserts
public void onFetchable(BaseClientPutter state) {}
Modified: trunk/plugins/WoT/introduction/IntroductionServer.java
===================================================================
--- trunk/plugins/WoT/introduction/IntroductionServer.java 2008-11-16
18:01:45 UTC (rev 23653)
+++ trunk/plugins/WoT/introduction/IntroductionServer.java 2008-11-16
18:06:27 UTC (rev 23654)
@@ -74,7 +74,7 @@
private final ArrayList<ClientGetter> mRequests = new
ArrayList<ClientGetter>(PUZZLE_COUNT * 5); /* Just assume that there are 5
identities */
- private final ArrayList<ClientPutter> mInserts = new
ArrayList<ClientPutter>(PUZZLE_COUNT * 5); /* Just assume that there are 5
identities */
+ private final ArrayList<BaseClientPutter> mInserts = new
ArrayList<BaseClientPutter>(PUZZLE_COUNT * 5); /* Just assume that there are 5
identities */
/**
@@ -159,16 +159,22 @@
Logger.debug(this, "Stopped the introduction server.");
}
- private synchronized void cancelRequests() {
- Iterator<ClientGetter> r = mRequests.iterator();
- Iterator<ClientPutter> i = mInserts.iterator();
- int rcounter = 0;
- int icounter = 0;
- Logger.debug(this, "Trying to stop all requests & inserts");
- while (r.hasNext()) { r.next().cancel(); ++rcounter; }
- while (i.hasNext()) { i.next().cancel(); ++icounter; }
- Logger.debug(this, "Stopped " + rcounter + " current requests");
- Logger.debug(this, "Stopped " + icounter + " current inserts");
+ private void cancelRequests() {
+ Logger.debug(this, "Trying to stop all requests & inserts");
+
+ synchronized(mRequests) {
+ Iterator<ClientGetter> r = mRequests.iterator();
+ int rcounter = 0;
+ while (r.hasNext()) { r.next().cancel(); ++rcounter; }
+ Logger.debug(this, "Stopped " + rcounter + " current
requests");
+ }
+
+ synchronized(mInserts) {
+ Iterator<BaseClientPutter> i = mInserts.iterator();
+ int icounter = 0;
+ while (i.hasNext()) { i.next().cancel(); ++icounter; }
+ Logger.debug(this, "Stopped " + icounter + " current
inserts");
+ }
}
private synchronized void downloadSolutions(OwnIdentity identity)
throws FetchException {
@@ -179,9 +185,11 @@
/* TODO: We restart all requests in every iteration. Decide
whether this makes sense or not, if not add code to re-use requests for
* puzzles which still exist.
* I think it makes sense to restart them because there are not
many puzzles and therefore not many requests. */
- for(int idx=0; idx < mRequests.size(); ++idx) {
- mRequests.get(idx).cancel();
- mRequests.remove(idx);
+ synchronized(mRequests) {
+ for(int idx=0; idx < mRequests.size(); ++idx) {
+ mRequests.get(idx).cancel();
+ mRequests.remove(idx);
+ }
}
for(IntroductionPuzzle p : puzzles) {
@@ -190,7 +198,9 @@
fetchContext.maxNonSplitfileRetries = -1; // retry
forever
ClientGetter g = mClient.fetch(p.getSolutionURI(), -1,
this, this, fetchContext);
g.setPriorityClass(RequestStarter.UPDATE_PRIORITY_CLASS); /* FIXME: decide
which one to use */
- mRequests.add(g);
+ synchronized(mRequests) {
+ mRequests.add(g);
+ }
Logger.debug(this, "Trying to fetch captcha solution
for " + p.getRequestURI() + " at " + p.getSolutionURI().toString());
}
@@ -235,7 +245,9 @@
/* FIXME: are these parameters correct?
*/
ClientPutter pu = mClient.insert(ib,
false, null, false, ictx, this);
pu.setPriorityClass(RequestStarter.UPDATE_PRIORITY_CLASS);
- mInserts.add(pu);
+ synchronized(mInserts) {
+ mInserts.add(pu);
+ }
tempB = null;
db.store(p);
@@ -261,63 +273,82 @@
}
}
+ private void removeRequest(ClientGetter g) {
+ Logger.debug(this, "Trying to remove request " + g.getURI());
+ synchronized(mRequests) {
+ g.cancel(); /* FIXME: is this necessary ? */
+ mRequests.remove(g);
+ }
+ Logger.debug(this, "Removed request.");
+ }
+
+ private void removeInsert(BaseClientPutter p) {
+ Logger.debug(this, "Trying to remove insert " + p.getURI());
+ synchronized(mInserts) {
+ p.cancel(); /* FIXME: is this necessary ? */
+ mInserts.remove(p);
+ }
+ Logger.debug(this, "Removed request.");
+ }
+
/**
* Called when the node can't fetch a file OR when there is a newer
edition.
* In our case, called when there is no solution to a puzzle in the
network.
*/
- public synchronized void onFailure(FetchException e, ClientGetter
state) {
+ public void onFailure(FetchException e, ClientGetter state) {
Logger.normal(this, "Downloading puzzle solution " +
state.getURI() + " failed: ", e);
-
- mRequests.remove(state);
+ removeRequest(state);
}
/**
* Called when a file is successfully fetched. We then add the identity
which
* solved the puzzle.
*/
- public synchronized void onSuccess(FetchResult result, ClientGetter
state) {
+ public void onSuccess(FetchResult result, ClientGetter state) {
Logger.debug(this, "Fetched puzzle solution: " +
state.getURI());
try {
db.commit();
IntroductionPuzzle p = IntroductionPuzzle.getByURI(db,
state.getURI());
- OwnIdentity puzzleOwner = (OwnIdentity)p.getInserter();
- Identity newIdentity =
Identity.importIntroductionFromXML(db, mIdentityFetcher,
result.asBucket().getInputStream());
- puzzleOwner.setTrust(db, newIdentity, (byte)50, "Trust
received by solving a captcha"); /* FIXME: We need null trust. Giving trust by
solving captchas is a REALLY bad idea */
- p.setSolved();
- db.store(p);
- db.commit();
+ synchronized(p) {
+ OwnIdentity puzzleOwner =
(OwnIdentity)p.getInserter();
+ Identity newIdentity =
Identity.importIntroductionFromXML(db, mIdentityFetcher,
result.asBucket().getInputStream());
+ puzzleOwner.setTrust(db, newIdentity, (byte)50,
"Trust received by solving a captcha"); /* FIXME: We need null trust. Giving
trust by solving captchas is a REALLY bad idea */
+ p.setSolved();
+ db.store(p);
+ db.commit();
+ Logger.debug(this, "Imported identity
introduction for identity " + newIdentity.getRequestURI());
+ }
- state.cancel(); /* FIXME: is this necessary */
- mRequests.remove(state);
+ removeRequest(state);
} catch (Exception e) {
Logger.error(this, "Parsing failed for "+
state.getURI(), e);
}
}
// Only called by inserts
- public synchronized void onSuccess(BaseClientPutter state)
+ public void onSuccess(BaseClientPutter state)
{
try {
IntroductionPuzzle p = IntroductionPuzzle.getByURI(db,
state.getURI());
Logger.debug(this, "Successful insert of puzzle from "
+ p.getInserter().getNickName() + ": " + p.getRequestURI());
} catch(Exception e) { Logger.error(this, "Error", e); }
- state.cancel(); /* FIXME: is this necessary */
- mInserts.remove(state);
+
+ removeInsert(state);
}
// Only called by inserts
- public synchronized void onFailure(InsertException e, BaseClientPutter
state)
+ public void onFailure(InsertException e, BaseClientPutter state)
{
try {
IntroductionPuzzle p = IntroductionPuzzle.getByURI(db,
state.getURI());
Logger.debug(this, "Insert of puzzle failed from " +
p.getInserter().getNickName() + ": " + p.getRequestURI(), e);
} catch(Exception ex) { Logger.error(this, "Error", e); }
- state.cancel(); /* FIXME: is this necessary */
- mInserts.remove(state);
+
+ removeInsert(state);
}
- /* Not needed functions from the ClientCallback inteface */
+ /* Not needed functions from the ClientCallback interface */
// Only called by inserts
public void onFetchable(BaseClientPutter state) {}
_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs