Revision: 17218
          http://sourceforge.net/p/gate/code/17218
Author:   valyt
Date:     2014-01-07 17:00:00 +0000 (Tue, 07 Jan 2014)
Log Message:
-----------
Started work on the composite index, now that we have an almost-working atomic 
token index.

Modified Paths:
--------------
    mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
    mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java

Modified: mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java        
2014-01-07 14:18:04 UTC (rev 17217)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java        
2014-01-07 17:00:00 UTC (rev 17218)
@@ -14,13 +14,25 @@
  */
 package gate.mimir;
 
+import gate.Gate;
+import gate.mimir.IndexConfig.TokenIndexerConfig;
 import gate.mimir.index.AtomicIndex;
 import gate.mimir.index.AtomicTokenIndex;
-import gate.mimir.index.Indexer;
+import gate.mimir.index.IndexException;
+import gate.mimir.index.mg4j.GATEDocument;
+import gate.mimir.index.mg4j.MimirIndexBuilder;
 import gate.mimir.search.QueryEngine;
+import gate.util.GateRuntimeException;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.log4j.Logger;
+
 /**
  * A Mímir index which can index document and answer queries. This class is the
  * main entry point to the Mímir API.
@@ -36,31 +48,193 @@
  */
 public class MimirIndex {
   
+
   /**
+   * The name of the file in the index directory where the index config is
+   * saved.
+   */
+  public static final String INDEX_CONFIG_FILENAME = "config.xml";
+  
+  private static final Logger logger = Logger.getLogger(MimirIndex.class);
+  
+  /**
+   * A {@link Runnable} that collects the documents from the sub-indexers and
+   * deletes them from GATE.
+   */
+  protected class DocumentCollector implements Runnable{
+    public void run(){
+      boolean finished = false;
+      while(!finished){
+        GATEDocument currentDocument = null;
+        try {
+          //get one document from each of the sub-indexers
+          //check identity and add to output queue.
+          for(AtomicIndex aSubIndexer : subIndexes){
+            GATEDocument aDoc = aSubIndexer.getOutputQueue().take();
+            if(currentDocument == null){
+              currentDocument = aDoc;
+            }else if(aDoc != currentDocument){
+              //malfunction!
+              throw new RuntimeException(
+                      "Out of order document received from sub-indexer!");
+            }
+          }
+          //we obtained the same document from all the sub-indexers
+          if(currentDocument != GATEDocument.END_OF_QUEUE) {
+            // let's delete it
+            logger.debug("Deleting document "
+                + currentDocument.getDocument().getName());
+            gate.Factory.deleteResource(currentDocument.getDocument());
+            logger.debug("Document deleted.  "
+                    + Gate.getCreoleRegister().getLrInstances(
+                        currentDocument.getDocument().getClass().getName())
+                            .size() + " documents still live.");            
+          } else {
+            // we're done
+            finished = true;
+          }
+        } catch(InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+  }  
+  
+  /**
    * The {@link IndexConfig} used for this index.
    */
   protected IndexConfig indexConfig;
 
   /**
+   * The top level directory containing this index.
+   */
+  protected File indexDirectory;
+  
+  /**
+   * The thread used to clean-up GATE documents after they have been indexed.
+   */
+  protected Thread documentsCollectorThread;
+  
+  protected volatile boolean closed = false;
+  
+  
+  /**
+   * The token indexes, in the order they are listed in the {@link 
#indexConfig}.
+   */
+  protected AtomicTokenIndex[] tokenIndexes;
+  
+  /**
+   * The annotation indexes, in the order they are listed in the 
+   * {@link #indexConfig}.
+   */
+  protected AtomicIndex[] mentionIndexes;
+
+  /**
+   * The {@link #tokenIndexes} and {@link #mentionIndexes} in one single array.
+   */
+  protected AtomicIndex[] subIndexes;
+  
+  
+  /**
    * Create a new Index.
    * @param indexConfig the configuration for the index.
    */
   public MimirIndex(IndexConfig indexConfig) {
-    super();
     this.indexConfig = indexConfig;
+    this.indexDirectory = this.indexConfig.getIndexDirectory();
+    
+    openIndex();
   }
   
   /**
    * Open and existing Index.
    * @param indexDirectory the on-disk directory containing the index to be 
    * opened.
+   * @throws IndexException if the index cannot be opened
+   * @throws IllegalArgumentException if an index cannot be found at the 
+   * specified location.
+   * @throws IOException if the index cannot be opened. 
    */
-  public MimirIndex(File indexDirectory ) {
-    // TODO
+  public MimirIndex(File indexDirectory ) throws IOException, IndexException {
+    if(!indexDirectory.isDirectory()) throw new IllegalArgumentException(
+        "No index found at " + indexDirectory);
+    File indexConfigFile = new File(indexDirectory, INDEX_CONFIG_FILENAME);
+    if(!indexConfigFile.canRead()) throw new IllegalArgumentException(
+        "Cannot read index confog from " + indexConfigFile); 
+    
+    this.indexConfig = IndexConfig.readConfigFromFile(indexConfigFile, 
+        indexDirectory);
+    this.indexDirectory = this.indexConfig.getIndexDirectory();
+    openIndex();
   }
   
-  protected AtomicTokenIndex[] tokenIndexes;
+  /**
+   * Opens the index files, if any, prepares all the sub-indexers specified in 
+   * the index config, and gets this index ready to start indexing documents 
and
+   * answer queries. 
+   */
+  protected void openIndex() {
+    // read the index config and create the sub-indexers
+    TokenIndexerConfig tokConfs[] = indexConfig.getTokenIndexers();
+    tokenIndexes = new AtomicTokenIndex[tokConfs.length];
+    for(int i = 0; i < tokConfs.length; i++) {
+      TokenIndexerConfig tokConf = tokConfs[i];
+      String subIndexname = "token-" + i;
+      tokenIndexes[i] = new AtomicTokenIndex(
+          this, 
+          subIndexname, 
+          new File(indexDirectory, subIndexname), 
+          tokConf.isDirectIndexEnabled(),
+          new LinkedBlockingQueue<GATEDocument>(),
+          new LinkedBlockingQueue<GATEDocument>(),
+          tokConf,
+          i == 0);
+    }
+    
+    //TODO
+    mentionIndexes = new AtomicIndex[0];
+    
+    // construct the joint array of sub-indexes
+    subIndexes = new AtomicIndex[tokenIndexes.length + mentionIndexes.length];
+    System.arraycopy(tokenIndexes, 0, subIndexes, 0, tokenIndexes.length);
+    System.arraycopy(mentionIndexes, 0, subIndexes, tokenIndexes.length, 
+        mentionIndexes.length);
+    
+    // start the collector thread
+    documentsCollectorThread = new Thread(new DocumentCollector());
+    documentsCollectorThread.start();
+  }
   
-  protected AtomicIndex[] mentionIndexes;
+  /**
+   * Queues a new document for indexing.
+   * @param document the document to be indexed.
+   * @throws InterruptedException if the process of posting the new document
+   * to all the input queues is interrupted.
+   * @throws IllegalStateException if the index has already been closed.
+   */
+  public void indexDocument(GATEDocument document) throws InterruptedException 
{
+    if(closed) throw new IllegalStateException("This index has been closed, "
+        + "no further documents can be indexed.");
+    for(AtomicIndex aSubIndex: subIndexes){
+      aSubIndex.getInputQueue().put(document);
+    }
+  }
   
+  /**
+   * Stops this index from accepting any further document for indexing, stops
+   * this index from accepting any more queries, finishes indexing all the 
+   * currently queued documents, writes all the files to disk, and returns.
+   * @throws InterruptedException 
+   */
+  public void close() throws InterruptedException {
+    if(!closed) {
+      closed = true;
+      for(AtomicIndex aSubIndex : subIndexes) {
+        aSubIndex.getInputQueue().put(GATEDocument.END_OF_QUEUE);
+      }      
+    }
+    // wait for indexing to end
+    documentsCollectorThread.join();
+  }
+  
 }

Modified: mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java 
2014-01-07 14:18:04 UTC (rev 17217)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java 
2014-01-07 17:00:00 UTC (rev 17218)
@@ -947,28 +947,31 @@
            // start in-RAM indexing
            newBatch();
          if(inputQueue != null) {
-        while((aDocument = inputQueue.take()) != GATEDocument.END_OF_QUEUE){
-          try {
-            processDocument(aDocument);
-          } catch(Throwable e) {
-            logger.error("Problem while indexing document!", e);
-          }
-          //dump batch if needed AND there is data to dump
-          if (occurrencesPerBatch > 0 && occurrencesInRAM > 
occurrencesPerBatch){
+        do{
+          aDocument = inputQueue.take();
+          if(aDocument != GATEDocument.END_OF_QUEUE) {
+            try {
+              processDocument(aDocument);
+            } catch(Throwable e) {
+              logger.error("Problem while indexing document!", e);
+            }
+            //dump batch if needed AND there is data to dump
+            if (occurrencesPerBatch > 0 && occurrencesInRAM > 
occurrencesPerBatch){
+              writeCurrentTail();
+            }
+            if(indexCompressionRequested) {
+              compressIndex();
+            }
+            if(tailWriteRequested) {
+              writeCurrentTail();
+            }            
+          } else {
+            // close down
             writeCurrentTail();
+            flush();
           }
           outputQueue.put(aDocument);
-          
-          if(indexCompressionRequested) {
-            compressIndex();
-          }
-          if(tailWriteRequested) {
-            writeCurrentTail();
-          }
-        }
-        // we're done
-        writeCurrentTail();
-        flush();
+        } while(aDocument != GATEDocument.END_OF_QUEUE);
          }
          }catch(InterruptedException e) {
       Thread.currentThread().interrupt();
@@ -1128,6 +1131,16 @@
     return indexDirectory;
   }
 
+  
+  
+  public BlockingQueue<GATEDocument> getInputQueue() {
+    return inputQueue;
+  }
+
+  public BlockingQueue<GATEDocument> getOutputQueue() {
+    return outputQueue;
+  }
+
   /**
    * Gets an {@link Index} value that can be used to search this atomic index.
    * This will normally be a {@link DocumentalCluster} view over all the 

This was sent by the SourceForge.net collaborative development platform, the 
world's largest Open Source development site.


------------------------------------------------------------------------------
Rapidly troubleshoot problems before they affect your business. Most IT 
organizations don't have a clear picture of how application performance 
affects their revenue. With AppDynamics, you get 100% visibility into your 
Java,.NET, & PHP application. Start your 15-day FREE TRIAL of AppDynamics Pro!
http://pubads.g.doubleclick.net/gampad/clk?id=84349831&iu=/4140/ostg.clktrk
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs

Reply via email to