Revision: 17218
http://sourceforge.net/p/gate/code/17218
Author: valyt
Date: 2014-01-07 17:00:00 +0000 (Tue, 07 Jan 2014)
Log Message:
-----------
Started work on the composite index, now that we have an almost-working atomic
token index.
Modified Paths:
--------------
mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
Modified: mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
2014-01-07 14:18:04 UTC (rev 17217)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
2014-01-07 17:00:00 UTC (rev 17218)
@@ -14,13 +14,25 @@
*/
package gate.mimir;
+import gate.Gate;
+import gate.mimir.IndexConfig.TokenIndexerConfig;
import gate.mimir.index.AtomicIndex;
import gate.mimir.index.AtomicTokenIndex;
-import gate.mimir.index.Indexer;
+import gate.mimir.index.IndexException;
+import gate.mimir.index.mg4j.GATEDocument;
+import gate.mimir.index.mg4j.MimirIndexBuilder;
import gate.mimir.search.QueryEngine;
+import gate.util.GateRuntimeException;
import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.log4j.Logger;
+
/**
* A Mímir index which can index document and answer queries. This class is the
* main entry point to the Mímir API.
@@ -36,31 +48,193 @@
*/
public class MimirIndex {
+
/**
+ * The name of the file in the index directory where the index config is
+ * saved.
+ */
+ public static final String INDEX_CONFIG_FILENAME = "config.xml";
+
+ private static final Logger logger = Logger.getLogger(MimirIndex.class);
+
+ /**
+ * A {@link Runnable} that collects the documents from the sub-indexers and
+ * deletes them from GATE.
+ */
+ protected class DocumentCollector implements Runnable{
+ public void run(){
+ boolean finished = false;
+ while(!finished){
+ GATEDocument currentDocument = null;
+ try {
+ //get one document from each of the sub-indexers
+ //check identity and add to output queue.
+ for(AtomicIndex aSubIndexer : subIndexes){
+ GATEDocument aDoc = aSubIndexer.getOutputQueue().take();
+ if(currentDocument == null){
+ currentDocument = aDoc;
+ }else if(aDoc != currentDocument){
+ //malfunction!
+ throw new RuntimeException(
+ "Out of order document received from sub-indexer!");
+ }
+ }
+ //we obtained the same document from all the sub-indexers
+ if(currentDocument != GATEDocument.END_OF_QUEUE) {
+ // let's delete it
+ logger.debug("Deleting document "
+ + currentDocument.getDocument().getName());
+ gate.Factory.deleteResource(currentDocument.getDocument());
+ logger.debug("Document deleted. "
+ + Gate.getCreoleRegister().getLrInstances(
+ currentDocument.getDocument().getClass().getName())
+ .size() + " documents still live.");
+ } else {
+ // we're done
+ finished = true;
+ }
+ } catch(InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
+ /**
* The {@link IndexConfig} used for this index.
*/
protected IndexConfig indexConfig;
/**
+ * The top level directory containing this index.
+ */
+ protected File indexDirectory;
+
+ /**
+ * The thread used to clean-up GATE documents after they have been indexed.
+ */
+ protected Thread documentsCollectorThread;
+
+ protected volatile boolean closed = false;
+
+
+ /**
+ * The token indexes, in the order they are listed in the {@link
#indexConfig}.
+ */
+ protected AtomicTokenIndex[] tokenIndexes;
+
+ /**
+ * The annotation indexes, in the order they are listed in the
+ * {@link #indexConfig}.
+ */
+ protected AtomicIndex[] mentionIndexes;
+
+ /**
+ * The {@link #tokenIndexes} and {@link #mentionIndexes} in one single array.
+ */
+ protected AtomicIndex[] subIndexes;
+
+
+ /**
* Create a new Index.
* @param indexConfig the configuration for the index.
*/
public MimirIndex(IndexConfig indexConfig) {
- super();
this.indexConfig = indexConfig;
+ this.indexDirectory = this.indexConfig.getIndexDirectory();
+
+ openIndex();
}
/**
* Open and existing Index.
* @param indexDirectory the on-disk directory containing the index to be
* opened.
+ * @throws IndexException if the index cannot be opened
+ * @throws IllegalArgumentException if an index cannot be found at the
+ * specified location.
+ * @throws IOException if the index cannot be opened.
*/
- public MimirIndex(File indexDirectory ) {
- // TODO
+ public MimirIndex(File indexDirectory ) throws IOException, IndexException {
+ if(!indexDirectory.isDirectory()) throw new IllegalArgumentException(
+ "No index found at " + indexDirectory);
+ File indexConfigFile = new File(indexDirectory, INDEX_CONFIG_FILENAME);
+ if(!indexConfigFile.canRead()) throw new IllegalArgumentException(
+ "Cannot read index confog from " + indexConfigFile);
+
+ this.indexConfig = IndexConfig.readConfigFromFile(indexConfigFile,
+ indexDirectory);
+ this.indexDirectory = this.indexConfig.getIndexDirectory();
+ openIndex();
}
- protected AtomicTokenIndex[] tokenIndexes;
+ /**
+ * Opens the index files, if any, prepares all the sub-indexers specified in
+ * the index config, and gets this index ready to start indexing documents
and
+ * answer queries.
+ */
+ protected void openIndex() {
+ // read the index config and create the sub-indexers
+ TokenIndexerConfig tokConfs[] = indexConfig.getTokenIndexers();
+ tokenIndexes = new AtomicTokenIndex[tokConfs.length];
+ for(int i = 0; i < tokConfs.length; i++) {
+ TokenIndexerConfig tokConf = tokConfs[i];
+ String subIndexname = "token-" + i;
+ tokenIndexes[i] = new AtomicTokenIndex(
+ this,
+ subIndexname,
+ new File(indexDirectory, subIndexname),
+ tokConf.isDirectIndexEnabled(),
+ new LinkedBlockingQueue<GATEDocument>(),
+ new LinkedBlockingQueue<GATEDocument>(),
+ tokConf,
+ i == 0);
+ }
+
+ //TODO
+ mentionIndexes = new AtomicIndex[0];
+
+ // construct the joint array of sub-indexes
+ subIndexes = new AtomicIndex[tokenIndexes.length + mentionIndexes.length];
+ System.arraycopy(tokenIndexes, 0, subIndexes, 0, tokenIndexes.length);
+ System.arraycopy(mentionIndexes, 0, subIndexes, tokenIndexes.length,
+ mentionIndexes.length);
+
+ // start the collector thread
+ documentsCollectorThread = new Thread(new DocumentCollector());
+ documentsCollectorThread.start();
+ }
- protected AtomicIndex[] mentionIndexes;
+ /**
+ * Queues a new document for indexing.
+ * @param document the document to be indexed.
+ * @throws InterruptedException if the process of posting the new document
+ * to all the input queues is interrupted.
+ * @throws IllegalStateException if the index has already been closed.
+ */
+ public void indexDocument(GATEDocument document) throws InterruptedException
{
+ if(closed) throw new IllegalStateException("This index has been closed, "
+ + "no further documents can be indexed.");
+ for(AtomicIndex aSubIndex: subIndexes){
+ aSubIndex.getInputQueue().put(document);
+ }
+ }
+ /**
+ * Stops this index from accepting any further document for indexing, stops
+ * this index from accepting any more queries, finishes indexing all the
+ * currently queued documents, writes all the files to disk, and returns.
+ * @throws InterruptedException
+ */
+ public void close() throws InterruptedException {
+ if(!closed) {
+ closed = true;
+ for(AtomicIndex aSubIndex : subIndexes) {
+ aSubIndex.getInputQueue().put(GATEDocument.END_OF_QUEUE);
+ }
+ }
+ // wait for indexing to end
+ documentsCollectorThread.join();
+ }
+
}
Modified: mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
2014-01-07 14:18:04 UTC (rev 17217)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
2014-01-07 17:00:00 UTC (rev 17218)
@@ -947,28 +947,31 @@
// start in-RAM indexing
newBatch();
if(inputQueue != null) {
- while((aDocument = inputQueue.take()) != GATEDocument.END_OF_QUEUE){
- try {
- processDocument(aDocument);
- } catch(Throwable e) {
- logger.error("Problem while indexing document!", e);
- }
- //dump batch if needed AND there is data to dump
- if (occurrencesPerBatch > 0 && occurrencesInRAM >
occurrencesPerBatch){
+ do{
+ aDocument = inputQueue.take();
+ if(aDocument != GATEDocument.END_OF_QUEUE) {
+ try {
+ processDocument(aDocument);
+ } catch(Throwable e) {
+ logger.error("Problem while indexing document!", e);
+ }
+ //dump batch if needed AND there is data to dump
+ if (occurrencesPerBatch > 0 && occurrencesInRAM >
occurrencesPerBatch){
+ writeCurrentTail();
+ }
+ if(indexCompressionRequested) {
+ compressIndex();
+ }
+ if(tailWriteRequested) {
+ writeCurrentTail();
+ }
+ } else {
+ // close down
writeCurrentTail();
+ flush();
}
outputQueue.put(aDocument);
-
- if(indexCompressionRequested) {
- compressIndex();
- }
- if(tailWriteRequested) {
- writeCurrentTail();
- }
- }
- // we're done
- writeCurrentTail();
- flush();
+ } while(aDocument != GATEDocument.END_OF_QUEUE);
}
}catch(InterruptedException e) {
Thread.currentThread().interrupt();
@@ -1128,6 +1131,16 @@
return indexDirectory;
}
+
+
+ public BlockingQueue<GATEDocument> getInputQueue() {
+ return inputQueue;
+ }
+
+ public BlockingQueue<GATEDocument> getOutputQueue() {
+ return outputQueue;
+ }
+
/**
* Gets an {@link Index} value that can be used to search this atomic index.
* This will normally be a {@link DocumentalCluster} view over all the
This was sent by the SourceForge.net collaborative development platform, the
world's largest Open Source development site.
------------------------------------------------------------------------------
Rapidly troubleshoot problems before they affect your business. Most IT
organizations don't have a clear picture of how application performance
affects their revenue. With AppDynamics, you get 100% visibility into your
Java,.NET, & PHP application. Start your 15-day FREE TRIAL of AppDynamics Pro!
http://pubads.g.doubleclick.net/gampad/clk?id=84349831&iu=/4140/ostg.clktrk
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs