Repository: jena Updated Branches: refs/heads/master 273ef7708 -> b4da74768
JENA-848 : Fix jena-text transaction subsystem (for Lucene, the isolation level is now serializable, and 2-phase commit is used to make it as atomic as possible) Project: http://git-wip-us.apache.org/repos/asf/jena/repo Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/767dd930 Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/767dd930 Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/767dd930 Branch: refs/heads/master Commit: 767dd930b3285065db6091957e38c8831cf1e24d Parents: 4ee8d9e Author: Stephen Allen <[email protected]> Authored: Thu Jan 15 17:12:31 2015 -0500 Committer: Stephen Allen <[email protected]> Committed: Thu Jan 15 17:25:35 2015 -0500 ---------------------------------------------------------------------- jena-text/src/main/java/jena/textindexer.java | 4 +- .../jena/query/text/DatasetGraphText.java | 121 ++++++++++++------- .../jena/query/text/TextDatasetFactory.java | 2 +- .../query/text/TextDocProducerEntities.java | 13 +- .../jena/query/text/TextDocProducerTriples.java | 22 +++- .../org/apache/jena/query/text/TextIndex.java | 20 +-- .../apache/jena/query/text/TextIndexLucene.java | 73 +++++++---- .../apache/jena/query/text/TextIndexSolr.java | 47 ++++--- .../text/TestLuceneWithMultipleThreads.java | 113 ++++++++++++----- 9 files changed, 283 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/jena/blob/767dd930/jena-text/src/main/java/jena/textindexer.java ---------------------------------------------------------------------- diff --git a/jena-text/src/main/java/jena/textindexer.java b/jena-text/src/main/java/jena/textindexer.java index e5586fc..eab736f 100644 --- a/jena-text/src/main/java/jena/textindexer.java +++ b/jena-text/src/main/java/jena/textindexer.java @@ -107,7 +107,6 @@ public class textindexer extends CmdARQ { @Override protected void exec() { Set<Node> properties = getIndexedProperties() ; - textIndex.startIndexing() ; // there are various strategies possible here // what is implemented is a first cut simple approach @@ -130,7 +129,8 @@ public class textindexer extends CmdARQ { } } } - textIndex.finishIndexing() ; + + textIndex.commit(); progressMonitor.close() ; } http://git-wip-us.apache.org/repos/asf/jena/blob/767dd930/jena-text/src/main/java/org/apache/jena/query/text/DatasetGraphText.java ---------------------------------------------------------------------- diff --git a/jena-text/src/main/java/org/apache/jena/query/text/DatasetGraphText.java b/jena-text/src/main/java/org/apache/jena/query/text/DatasetGraphText.java index 1681323..c5655b5 100644 --- a/jena-text/src/main/java/org/apache/jena/query/text/DatasetGraphText.java +++ b/jena-text/src/main/java/org/apache/jena/query/text/DatasetGraphText.java @@ -28,8 +28,11 @@ import org.slf4j.LoggerFactory ; import com.hp.hpl.jena.graph.Graph ; import com.hp.hpl.jena.graph.Node ; import com.hp.hpl.jena.query.ReadWrite ; -import com.hp.hpl.jena.sparql.JenaTransactionException ; -import com.hp.hpl.jena.sparql.core.* ; +import com.hp.hpl.jena.sparql.core.DatasetGraph ; +import com.hp.hpl.jena.sparql.core.DatasetGraphMonitor ; +import com.hp.hpl.jena.sparql.core.DatasetGraphWithLock ; +import com.hp.hpl.jena.sparql.core.GraphView ; +import com.hp.hpl.jena.sparql.core.Transactional ; public class DatasetGraphText extends DatasetGraphMonitor implements Transactional { @@ -38,7 +41,13 @@ public class DatasetGraphText extends DatasetGraphMonitor implements Transaction private final Transactional dsgtxn ; private final Graph dftGraph ; private final boolean closeIndexOnClose; - + + + // If we are going to implement Transactional, then we are going to have to do as DatasetGraphWithLock and + // TDB's DatasetGraphTransaction do and track transaction state in a ThreadLocal + private final ThreadLocal<ReadWrite> readWriteMode = new ThreadLocal<ReadWrite>(); + + public DatasetGraphText(DatasetGraph dsg, TextIndex index, TextDocProducer producer) { this(dsg, index, producer, false); @@ -56,7 +65,7 @@ public class DatasetGraphText extends DatasetGraphMonitor implements Transaction this.closeIndexOnClose = closeIndexOnClose; } - // ---- Intecept these and force the use of views. + // ---- Intercept these and force the use of views. @Override public Graph getDefaultGraph() { return dftGraph ; @@ -99,66 +108,94 @@ public class DatasetGraphText extends DatasetGraphMonitor implements Transaction return results.iterator() ; } - // Imperfect. - private boolean needFinish = false ; - @Override public void begin(ReadWrite readWrite) { + readWriteMode.set(readWrite); dsgtxn.begin(readWrite) ; - // textIndex.begin(readWrite) ; - if ( readWrite == ReadWrite.WRITE ) { - // WRONG design - super.getMonitor().start() ; - // Right design. - // textIndex.startIndexing() ; - needFinish = true ; - } + super.getMonitor().start() ; } - + + /** + * Rollback all changes, discarding any exceptions that occur. + */ + @Override + public void abort() { + // Roll back all both objects, discarding any exceptions that occur + try { dsgtxn.abort(); } catch (Throwable t) { log.warn("Exception in abort: " + t.getMessage(), t); } + try { textIndex.rollback(); } catch (Throwable t) { log.warn("Exception in abort: " + t.getMessage(), t); } + + readWriteMode.set(null) ; + super.getMonitor().finish() ; + } + + /** + * Perform a 2-phase commit by first calling prepareCommit() on the TextIndex + * followed by committing the Transaction object, and then calling commit() + * on the TextIndex(). + * <p> + * If either of the objects fail on either the preparation or actual commit, + * it terminates and calls {@link #rollback()} on both of them. + * <p> + * <b>NOTE:</b> it may happen that the TextIndex fails to commit, after the + * Transactional has already successfully committed. A rollback instruction will + * still be issued, but depending on the implementation, it may not have any effect. + */ @Override public void commit() { - try { - if ( needFinish ) { - super.getMonitor().finish() ; - // textIndex.finishIndexing() ; + // Phase 1 + if (readWriteMode.get() == ReadWrite.WRITE) { + try { + textIndex.prepareCommit(); + } + catch (Throwable t) { + log.error("Exception in prepareCommit: " + t.getMessage(), t) ; + abort(); + throw new TextIndexException(t); } - needFinish = false ; - // textIndex.commit() ; - dsgtxn.commit() ; - } - catch (Throwable ex) { - log.warn("Exception in commit: " + ex.getMessage(), ex) ; - dsgtxn.abort() ; - throw ex; } - } - - @Override - public void abort() { + + // Phase 2 try { - if ( needFinish ) - textIndex.abortIndexing() ; - dsgtxn.abort() ; + dsgtxn.commit(); + if (readWriteMode.get() == ReadWrite.WRITE) { + textIndex.commit(); + } } - catch (JenaTransactionException ex) { throw ex ; } - catch (RuntimeException ex) { - log.warn("Exception in abort: " + ex.getMessage(), ex) ; - throw ex ; + catch (Throwable t) { + log.error("Exception in commit: " + t.getMessage(), t) ; + abort(); + throw new TextIndexException(t); } + readWriteMode.set(null); + super.getMonitor().finish() ; } @Override public boolean isInTransaction() { - return dsgtxn.isInTransaction() ; + return readWriteMode.get() != null; } @Override public void end() { + // If we are still in a write transaction at this point, then commit was never called, so rollback the TextIndex + if (readWriteMode.get() == ReadWrite.WRITE) { + try { + textIndex.rollback(); + } + catch (Throwable t) { + log.warn("Exception in end: " + t.getMessage(), t) ; + } + } + try { - // textIndex.end() ; dsgtxn.end() ; } - catch (Throwable ex) { log.warn("Exception in end: " + ex.getMessage(), ex) ; } + catch (Throwable t) { + log.warn("Exception in end: " + t.getMessage(), t) ; + } + + readWriteMode.set(null) ; + super.getMonitor().finish() ; } @Override http://git-wip-us.apache.org/repos/asf/jena/blob/767dd930/jena-text/src/main/java/org/apache/jena/query/text/TextDatasetFactory.java ---------------------------------------------------------------------- diff --git a/jena-text/src/main/java/org/apache/jena/query/text/TextDatasetFactory.java b/jena-text/src/main/java/org/apache/jena/query/text/TextDatasetFactory.java index aad5e93..f6ab21b 100644 --- a/jena-text/src/main/java/org/apache/jena/query/text/TextDatasetFactory.java +++ b/jena-text/src/main/java/org/apache/jena/query/text/TextDatasetFactory.java @@ -66,8 +66,8 @@ public class TextDatasetFactory DatasetGraph dsgt = new DatasetGraphText(dsg, textIndex, producer, closeIndexOnDSGClose) ; // Also set on dsg Context c = dsgt.getContext() ; + c.set(TextQuery.textIndex, textIndex) ; - dsgt.getContext().set(TextQuery.textIndex, textIndex) ; return dsgt ; } http://git-wip-us.apache.org/repos/asf/jena/blob/767dd930/jena-text/src/main/java/org/apache/jena/query/text/TextDocProducerEntities.java ---------------------------------------------------------------------- diff --git a/jena-text/src/main/java/org/apache/jena/query/text/TextDocProducerEntities.java b/jena-text/src/main/java/org/apache/jena/query/text/TextDocProducerEntities.java index 9a3b1f0..9e2de8d 100644 --- a/jena-text/src/main/java/org/apache/jena/query/text/TextDocProducerEntities.java +++ b/jena-text/src/main/java/org/apache/jena/query/text/TextDocProducerEntities.java @@ -35,27 +35,30 @@ public class TextDocProducerEntities extends DatasetChangesBatched implements Te private static Logger log = LoggerFactory.getLogger(TextDocProducer.class) ; private final EntityDefinition defn ; private final TextIndex indexer ; - private boolean started = false ; + + // Also have to have a ThreadLocal here to keep track of whether or not we are in a transaction, + // therefore whether or not we have to do autocommit + private final ThreadLocal<Boolean> inTransaction = new ThreadLocal<Boolean>() ; public TextDocProducerEntities(EntityDefinition defn, TextIndex indexer) { this.defn = defn ; this.indexer = indexer ; + inTransaction.set(false) ; } @Override protected void startBatched() { - indexer.startIndexing() ; - started = true ; + inTransaction.set(true) ; } @Override protected void finishBatched() { - indexer.finishIndexing() ; + inTransaction.set(false) ; } @Override protected void dispatch(QuadAction quadAction, List<Quad> batch) { - if ( !started ) + if ( !inTransaction.get() ) throw new IllegalStateException("Not started") ; if ( !QuadAction.ADD.equals(quadAction) ) return ; http://git-wip-us.apache.org/repos/asf/jena/blob/767dd930/jena-text/src/main/java/org/apache/jena/query/text/TextDocProducerTriples.java ---------------------------------------------------------------------- diff --git a/jena-text/src/main/java/org/apache/jena/query/text/TextDocProducerTriples.java b/jena-text/src/main/java/org/apache/jena/query/text/TextDocProducerTriples.java index 2f0a679..3700eb0 100644 --- a/jena-text/src/main/java/org/apache/jena/query/text/TextDocProducerTriples.java +++ b/jena-text/src/main/java/org/apache/jena/query/text/TextDocProducerTriples.java @@ -28,22 +28,26 @@ public class TextDocProducerTriples implements TextDocProducer { private static Logger log = LoggerFactory.getLogger(TextDocProducerTriples.class) ; private final EntityDefinition defn ; private final TextIndex indexer ; - private boolean started = false ; + + // Also have to have a ThreadLocal here to keep track of whether or not we are in a transaction, + // therefore whether or not we have to do autocommit + private final ThreadLocal<Boolean> inTransaction = new ThreadLocal<Boolean>() ; + public TextDocProducerTriples(EntityDefinition defn, TextIndex indexer) { this.defn = defn ; this.indexer = indexer ; + inTransaction.set(false) ; } @Override public void start() { - indexer.startIndexing() ; - started = true ; + inTransaction.set(true) ; } @Override public void finish() { - indexer.finishIndexing() ; + inTransaction.set(false) ; } @Override @@ -54,8 +58,14 @@ public class TextDocProducerTriples implements TextDocProducer { return ; Entity entity = TextQueryFuncs.entityFromQuad(defn, g, s, p, o) ; - if ( entity != null ) - // Null means does not match defn + // Null means does not match defn + if ( entity != null ) { indexer.addEntity(entity) ; + + // Auto commit the entity if we aren't in a transaction + if (!inTransaction.get()) { + indexer.commit() ; + } + } } } http://git-wip-us.apache.org/repos/asf/jena/blob/767dd930/jena-text/src/main/java/org/apache/jena/query/text/TextIndex.java ---------------------------------------------------------------------- diff --git a/jena-text/src/main/java/org/apache/jena/query/text/TextIndex.java b/jena-text/src/main/java/org/apache/jena/query/text/TextIndex.java index 26acdf5..3aabd5b 100644 --- a/jena-text/src/main/java/org/apache/jena/query/text/TextIndex.java +++ b/jena-text/src/main/java/org/apache/jena/query/text/TextIndex.java @@ -28,23 +28,27 @@ import com.hp.hpl.jena.graph.Node ; /** TextIndex abstraction */ public interface TextIndex extends Closeable //, Transactional { + // Transactional operations + void prepareCommit() ; + void commit() ; + void rollback() ; + + // Update operations - public abstract void startIndexing() ; - public abstract void addEntity(Entity entity) ; - public abstract void finishIndexing() ; - public abstract void abortIndexing() ; + void addEntity(Entity entity) ; + // read operations /** Get all entries for uri */ - public abstract Map<String, Node> get(String uri) ; + Map<String, Node> get(String uri) ; //** score // Need to have more complex results. /** Access the index - limit if -1 for as many as possible */ - public abstract List<Node> query(String qs, int limit) ; + List<Node> query(String qs, int limit) ; - public abstract List<Node> query(String qs) ; + List<Node> query(String qs) ; - public abstract EntityDefinition getDocDef() ; + EntityDefinition getDocDef() ; } http://git-wip-us.apache.org/repos/asf/jena/blob/767dd930/jena-text/src/main/java/org/apache/jena/query/text/TextIndexLucene.java ---------------------------------------------------------------------- diff --git a/jena-text/src/main/java/org/apache/jena/query/text/TextIndexLucene.java b/jena-text/src/main/java/org/apache/jena/query/text/TextIndexLucene.java index 47a804d..36d4050 100644 --- a/jena-text/src/main/java/org/apache/jena/query/text/TextIndexLucene.java +++ b/jena-text/src/main/java/org/apache/jena/query/text/TextIndexLucene.java @@ -74,9 +74,20 @@ public class TextIndexLucene implements TextIndex { private final EntityDefinition docDef ; private final Directory directory ; - private final IndexWriter indexWriter ; private final Analyzer analyzer ; + // The IndexWriter can't be final because we may have to recreate it if rollback() is called. + // However, it needs to be volatile in case the next write transaction is on a different thread, + // but we do not need locking because we are assuming that there can only be one writer + // at a time (enforced elsewhere). + private volatile IndexWriter indexWriter ; + + /** + * Constructs a new TextIndexLucene. + * + * @param directory The Lucene Directory for the index + * @param def The EntityDefinition that defines how entities are stored in the index + */ public TextIndexLucene(Directory directory, EntityDefinition def) { this.directory = directory ; this.docDef = def ; @@ -97,17 +108,23 @@ public class TextIndexLucene implements TextIndex { this.analyzer = new PerFieldAnalyzerWrapper(new StandardAnalyzer(VER), analyzerPerField) ; + openIndexWriter(); + } + + private void openIndexWriter() { IndexWriterConfig wConfig = new IndexWriterConfig(VER, analyzer) ; try { indexWriter = new IndexWriter(directory, wConfig) ; + // Force a commit to create the index, otherwise querying before writing will cause an exception + indexWriter.commit(); } catch (IOException e) { throw new TextIndexException(e) ; } } - + public Directory getDirectory() { return directory ; } @@ -119,28 +136,40 @@ public class TextIndexLucene implements TextIndex { public IndexWriter getIndexWriter() { return indexWriter; } - + @Override - public void startIndexing() { } - + public void prepareCommit() { + try { + indexWriter.prepareCommit(); + } + catch (IOException e) { + throw new TextIndexException(e); + } + } + @Override - public void finishIndexing() { + public void commit() { try { - indexWriter.commit() ; + indexWriter.commit(); } catch (IOException e) { - exception(e) ; + throw new TextIndexException(e); } } - + @Override - public void abortIndexing() { + public void rollback() { + IndexWriter idx = indexWriter; + indexWriter = null; try { - indexWriter.rollback() ; + idx.rollback(); } - catch (IOException ex) { - exception(ex) ; + catch (IOException e) { + throw new TextIndexException(e); } + + // The rollback will close the indexWriter, so we need to reopen it + openIndexWriter(); } @Override @@ -149,7 +178,7 @@ public class TextIndexLucene implements TextIndex { indexWriter.close() ; } catch (IOException ex) { - exception(ex) ; + throw new TextIndexException(ex) ; } } @@ -162,7 +191,7 @@ public class TextIndexLucene implements TextIndex { indexWriter.addDocument(doc) ; } catch (IOException e) { - exception(e) ; + throw new TextIndexException(e) ; } } @@ -187,7 +216,7 @@ public class TextIndexLucene implements TextIndex { @Override public Map<String, Node> get(String uri) { try { - IndexReader indexReader = DirectoryReader.open(indexWriter, true); + IndexReader indexReader = DirectoryReader.open(directory); List<Map<String, Node>> x = get$(indexReader, uri) ; if ( x.size() == 0 ) return null ; @@ -196,8 +225,7 @@ public class TextIndexLucene implements TextIndex { return x.get(0) ; } catch (Exception ex) { - exception(ex) ; - return null ; + throw new TextIndexException(ex) ; } } @@ -248,12 +276,11 @@ public class TextIndexLucene implements TextIndex { @Override public List<Node> query(String qs, int limit) { //** score - try (IndexReader indexReader = DirectoryReader.open(indexWriter, true)) { + try (IndexReader indexReader = DirectoryReader.open(directory)) { return query$(indexReader, qs, limit) ; } catch (Exception ex) { - exception(ex) ; - return null ; + throw new TextIndexException(ex) ; } } @@ -287,8 +314,4 @@ public class TextIndexLucene implements TextIndex { // TEMP return NodeFactoryExtra.createLiteralNode(v, null, null) ; } - - private static void exception(Exception ex) { - throw new TextIndexException(ex) ; - } } http://git-wip-us.apache.org/repos/asf/jena/blob/767dd930/jena-text/src/main/java/org/apache/jena/query/text/TextIndexSolr.java ---------------------------------------------------------------------- diff --git a/jena-text/src/main/java/org/apache/jena/query/text/TextIndexSolr.java b/jena-text/src/main/java/org/apache/jena/query/text/TextIndexSolr.java index 41d32ba..45571bc 100644 --- a/jena-text/src/main/java/org/apache/jena/query/text/TextIndexSolr.java +++ b/jena-text/src/main/java/org/apache/jena/query/text/TextIndexSolr.java @@ -18,7 +18,11 @@ package org.apache.jena.query.text; -import java.util.* ; +import java.io.IOException ; +import java.util.ArrayList ; +import java.util.HashMap ; +import java.util.List ; +import java.util.Map ; import java.util.Map.Entry ; import org.apache.solr.client.solrj.SolrQuery ; @@ -39,10 +43,10 @@ import com.hp.hpl.jena.sparql.util.NodeFactoryExtra ; public class TextIndexSolr implements TextIndex { - private static Logger log = LoggerFactory.getLogger(TextIndexSolr.class) ; + private static final Logger log = LoggerFactory.getLogger(TextIndexSolr.class) ; private final SolrServer solrServer ; - private EntityDefinition docDef ; - private static int MAX_N = 10000 ; + private final EntityDefinition docDef ; + private static final int MAX_N = 10000 ; public TextIndexSolr(SolrServer server, EntityDefinition def) { @@ -51,21 +55,32 @@ public class TextIndexSolr implements TextIndex } @Override - public void startIndexing() - {} - + public void prepareCommit() { } + @Override - public void finishIndexing() - { - try { solrServer.commit() ; } - catch (Exception ex) { exception(ex) ; } + public void commit() { + try { + solrServer.commit(); + } + catch (SolrServerException e) { + throw new TextIndexException(e); + } + catch (IOException e) { + throw new TextIndexException(e); + } } - + @Override - public void abortIndexing() - { - try { solrServer.rollback() ; } - catch (Exception ex) { exception(ex) ; } + public void rollback() { + try { + solrServer.rollback(); + } + catch (SolrServerException e) { + throw new TextIndexException(e); + } + catch (IOException e) { + throw new TextIndexException(e); + } } @Override http://git-wip-us.apache.org/repos/asf/jena/blob/767dd930/jena-text/src/test/java/org/apache/jena/query/text/TestLuceneWithMultipleThreads.java ---------------------------------------------------------------------- diff --git a/jena-text/src/test/java/org/apache/jena/query/text/TestLuceneWithMultipleThreads.java b/jena-text/src/test/java/org/apache/jena/query/text/TestLuceneWithMultipleThreads.java index 4788596..b60110d 100644 --- a/jena-text/src/test/java/org/apache/jena/query/text/TestLuceneWithMultipleThreads.java +++ b/jena-text/src/test/java/org/apache/jena/query/text/TestLuceneWithMultipleThreads.java @@ -18,18 +18,21 @@ package org.apache.jena.query.text; +import java.util.ArrayList ; +import java.util.List ; import java.util.concurrent.ExecutionException ; import java.util.concurrent.ExecutorService ; import java.util.concurrent.Executors ; import java.util.concurrent.Future ; import java.util.concurrent.TimeUnit ; +import org.apache.jena.query.text.assembler.TextVocab ; import org.apache.lucene.analysis.standard.StandardAnalyzer ; import org.apache.lucene.store.RAMDirectory ; import org.apache.lucene.util.Version ; -import org.junit.Before ; import org.junit.Test ; +import com.hp.hpl.jena.graph.NodeFactory ; import com.hp.hpl.jena.query.Dataset ; import com.hp.hpl.jena.query.DatasetFactory ; import com.hp.hpl.jena.query.QueryExecution ; @@ -38,8 +41,7 @@ import com.hp.hpl.jena.query.ReadWrite ; import com.hp.hpl.jena.query.ResultSet ; import com.hp.hpl.jena.rdf.model.Model ; import com.hp.hpl.jena.rdf.model.ResourceFactory ; -import com.hp.hpl.jena.sparql.core.DatasetGraph ; -import com.hp.hpl.jena.sparql.core.Transactional ; +import com.hp.hpl.jena.sparql.core.DatasetGraphFactory ; import com.hp.hpl.jena.sparql.modify.GraphStoreNullTransactional ; import com.hp.hpl.jena.vocabulary.RDFS ; @@ -58,20 +60,10 @@ public class TestLuceneWithMultipleThreads entDef.setAnalyzer("label", analyzer); } - private DatasetGraph dsg; - private Transactional tx; - - @Before - public void setup() - { - dsg = TextDatasetFactory.createLucene(new GraphStoreNullTransactional(), new RAMDirectory(), entDef); - tx = (Transactional)dsg; - } - - @Test public void testReadInMiddleOfWrite() throws InterruptedException, ExecutionException { + final DatasetGraphText dsg = (DatasetGraphText)TextDatasetFactory.createLucene(new GraphStoreNullTransactional(), new RAMDirectory(), entDef); final Dataset ds = DatasetFactory.create(dsg); final ExecutorService execService = Executors.newSingleThreadExecutor(); final Future<?> f = execService.submit(new Runnable() @@ -82,7 +74,7 @@ public class TestLuceneWithMultipleThreads // Hammer the dataset with a series of read queries while (!Thread.interrupted()) { - tx.begin(ReadWrite.READ); + dsg.begin(ReadWrite.READ); try { QueryExecution qExec = QueryExecutionFactory.create("select * where { ?s ?p ?o }", ds); @@ -91,17 +83,17 @@ public class TestLuceneWithMultipleThreads { rs.next(); } - tx.commit(); + dsg.commit(); } finally { - tx.end(); + dsg.end(); } } } }); - tx.begin(ReadWrite.WRITE); + dsg.begin(ReadWrite.WRITE); try { Model m = ds.getDefaultModel(); @@ -110,11 +102,11 @@ public class TestLuceneWithMultipleThreads Thread.sleep(100); m.add(ResourceFactory.createResource("http://example.org/"), RDFS.comment, "comment"); - tx.commit(); + dsg.commit(); } finally { - tx.end(); + dsg.end(); } execService.shutdownNow(); @@ -127,9 +119,10 @@ public class TestLuceneWithMultipleThreads @Test public void testWriteInMiddleOfRead() throws InterruptedException, ExecutionException { + final DatasetGraphText dsg = (DatasetGraphText)TextDatasetFactory.createLucene(new GraphStoreNullTransactional(), new RAMDirectory(), entDef); final int numReads = 10; final Dataset ds = DatasetFactory.create(dsg); - final ExecutorService execService = Executors.newFixedThreadPool(10); //.newSingleThreadExecutor(); + final ExecutorService execService = Executors.newFixedThreadPool(10); final Future<?> f = execService.submit(new Runnable() { @Override @@ -137,7 +130,7 @@ public class TestLuceneWithMultipleThreads { while (!Thread.interrupted()) { - tx.begin(ReadWrite.WRITE); + dsg.begin(ReadWrite.WRITE); try { Model m = ds.getDefaultModel(); @@ -153,11 +146,11 @@ public class TestLuceneWithMultipleThreads } m.add(ResourceFactory.createResource("http://example.org/"), RDFS.comment, "comment"); - tx.commit(); + dsg.commit(); } finally { - tx.end(); + dsg.end(); } } } @@ -165,7 +158,7 @@ public class TestLuceneWithMultipleThreads for (int i=0; i<numReads; i++) { - tx.begin(ReadWrite.READ); + dsg.begin(ReadWrite.READ); try { QueryExecution qExec = QueryExecutionFactory.create("select * where { ?s ?p ?o }", ds); @@ -176,11 +169,11 @@ public class TestLuceneWithMultipleThreads } // Sleep for a bit so that the writer thread can get in between the reads Thread.sleep(100); - tx.commit(); + dsg.commit(); } finally { - tx.end(); + dsg.end(); } } @@ -190,4 +183,70 @@ public class TestLuceneWithMultipleThreads // If there was an exception in the write thread then Future.get() will throw an ExecutionException assertTrue(f.get() == null); } + + @Test + public void testIsolation() throws InterruptedException, ExecutionException { + + final DatasetGraphText dsg = (DatasetGraphText)TextDatasetFactory.createLucene(DatasetGraphFactory.createMem(), new RAMDirectory(), entDef); + + final int numReaders = 2; + final List<Future<?>> futures = new ArrayList<Future<?>>(numReaders); + final ExecutorService execService = Executors.newFixedThreadPool(numReaders); + final Dataset ds = DatasetFactory.create(dsg); + + + for (int i=0; i<numReaders; i++) { + futures.add(execService.submit(new Runnable() { + @Override + public void run() + { + while (!Thread.interrupted()) { + dsg.begin(ReadWrite.READ); + try { + QueryExecution qExec = QueryExecutionFactory.create( + "select * where { graph <http://example.org/graph> { ?s <" + TextVocab.pfQuery + "> (<" + RDFS.label.getURI() + "> \"test\") } }", ds); +// "select * where { graph <http://example.org/graph> { ?s <" + RDFS.label.getURI() + "> \"test\" } }", ds); + ResultSet rs = qExec.execSelect(); + assertFalse(rs.hasNext()); + dsg.commit(); + } + finally { + dsg.end(); + } + + try { + Thread.sleep(10); + } + catch (InterruptedException e) { + break; + } + } + } + })); + } + + // Give the read threads a chance to start up + Thread.sleep(500); + dsg.begin(ReadWrite.WRITE); + try { + dsg.add(NodeFactory.createURI("http://example.org/graph"), NodeFactory.createURI("http://example.org/test"), RDFS.label.asNode(), NodeFactory.createLiteral("test")); + + // Now give the read threads a chance to note the change + Thread.sleep(500); + + // Don't commit this change + } + finally { + dsg.end(); + } + // Just in case dsg.end() inappropriately commits the change + Thread.sleep(500); + + execService.shutdownNow(); + execService.awaitTermination(1000, TimeUnit.MILLISECONDS); + for(Future<?> f : futures) { + assertTrue(f.get() == null); + } + } + }
