Revision: 17187
          http://sourceforge.net/p/gate/code/17187
Author:   valyt
Date:     2013-12-19 17:52:46 +0000 (Thu, 19 Dec 2013)
Log Message:
-----------
More work on the new live-index framework:
- we now use the MG4J API for writing out indexes
- base indexer functionality for both tokens and mentions
- started work on the new token indexer 

Modified Paths:
--------------
    mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
    mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java

Added Paths:
-----------
    mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicTokenIndex.java

Modified: mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java        
2013-12-18 16:03:06 UTC (rev 17186)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java        
2013-12-19 17:52:46 UTC (rev 17187)
@@ -15,6 +15,7 @@
 package gate.mimir;
 
 import gate.mimir.index.AtomicIndex;
+import gate.mimir.index.AtomicTokenIndex;
 import gate.mimir.index.Indexer;
 import gate.mimir.search.QueryEngine;
 
@@ -58,7 +59,7 @@
     // TODO
   }
   
-  protected AtomicIndex[] tokenIndexes;
+  protected AtomicTokenIndex[] tokenIndexes;
   
   protected AtomicIndex[] mentionIndexes;
   

Modified: mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java 
2013-12-18 16:03:06 UTC (rev 17186)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java 
2013-12-19 17:52:46 UTC (rev 17187)
@@ -14,12 +14,42 @@
  */
 package gate.mimir.index;
 
+import gate.Annotation;
 import gate.Document;
 import gate.mimir.MimirIndex;
+import gate.mimir.index.mg4j.GATEDocument;
+import gate.util.GateRuntimeException;
+import it.unimi.di.big.mg4j.index.CompressionFlags;
+import it.unimi.di.big.mg4j.index.DiskBasedIndex;
 import it.unimi.di.big.mg4j.index.IndexReader;
+import it.unimi.di.big.mg4j.index.IndexWriter;
+import it.unimi.di.big.mg4j.index.QuasiSuccinctIndex;
+import it.unimi.di.big.mg4j.index.QuasiSuccinctIndexWriter;
+import it.unimi.di.big.mg4j.io.IOFactory;
+import it.unimi.dsi.bits.Fast;
+import it.unimi.dsi.fastutil.Arrays;
+import it.unimi.dsi.fastutil.Hash;
+import it.unimi.dsi.fastutil.Swapper;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntComparator;
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
+import it.unimi.dsi.fastutil.objects.Object2ReferenceOpenHashMap;
+import it.unimi.dsi.io.OutputBitStream;
+import it.unimi.dsi.lang.MutableString;
 
 import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.ByteOrder;
+import java.util.concurrent.BlockingQueue;
 
+import org.apache.log4j.Logger;
+
+import com.google.common.io.PatternFilenameFilter;
+
 /**
  * An indirect index associating terms with documents. Terms can be either 
token
  * feature values, or semantic annotation URIs. Optionally, a direct index may 
@@ -30,8 +60,187 @@
  * all the new documents that have been queued for indexing since the last tail
  * was written are stored in RAM.  
  */
-public abstract class AtomicIndex {
+public abstract class AtomicIndex implements Runnable {
+  
+  /**
+   * An in-RAM representation of a postings list
+   */
+  protected static class PostingsList {
+    
+    /**
+     * The first document pointer added to this postings list.
+     */
+    private long firstDocumentPointer = -1;
+    
+    /**
+     * The last seen document pointer.
+     */
+    private long lastDocumentPointer = -1;
+    
+    /**
+     * The list of document pointer differentials (differences from 
+     * {@link #firstDocumentPointer}). For the sake of easy alignment, we 
+     * actaully store a <tt>0</tt> on the first position.
+     */
+    private IntList documentPointersDifferential;
+    
 
+    /**
+     * The list of counts for each document. This list is aligned with 
+     * {@link #documentPointersDifferential}.
+     */
+    private IntList counts;
+    
+    /**
+     * The list of positions in this postings list. For each document at 
position
+     * <tt>i</i>, there will be counts[i] positions stored in this list.
+     */
+    private IntArrayList positions;
+    
+    /**
+     * The last seen position.
+     */
+    private int lastPosition = -1;
+    
+    /**
+     * The number of position in the current document
+     */
+    private int count = 0; 
+    
+    /**
+     * The maximum count of all the stored documents
+     */
+    private int maxCount = 0;
+    
+    /**
+     * The number of document pointers contained
+     */
+    private long frequency = 0;
+    
+    /**
+     * The total number of occurrences stored.
+     */
+    private long occurrences = 0;
+    /**
+     * The sum of the maximum positions for each document.
+     */
+    private long sumMaxPos = 0;
+    
+    public PostingsList(boolean storePositions) {
+      firstDocumentPointer = -1;
+      documentPointersDifferential = new IntArrayList();
+      counts = new IntArrayList();
+      if(storePositions) {
+        positions = new IntArrayList();
+      }
+    }
+
+    /**
+     * Start storing the data for a new document
+     * @param pointer
+     */
+    public void newDocumentPointer(long pointer) {
+      // is this really a new document
+      if(pointer != lastDocumentPointer) {
+        if(firstDocumentPointer < 0) firstDocumentPointer = pointer;
+        if(lastDocumentPointer == -1) {
+          // this is the first document
+          documentPointersDifferential.add(0);      
+        } else {
+          // close previous document
+          flush();
+          // add the new document
+          documentPointersDifferential.add((int)(pointer - 
lastDocumentPointer));
+        }
+        lastDocumentPointer = pointer;
+        // reset the lastPosition when moving to a new document
+        lastPosition = -1;
+        
+        frequency++;
+      }
+    }
+
+    public void addPosition(int pos) {
+      // ignore if the position hasn't changed: we don't store two identical 
+      // records
+      if(pos != lastPosition) {
+        positions.add(pos);
+        count++;
+        //and update lastPosition
+        lastPosition = pos;        
+      }
+    }
+    
+    /**
+     * Checks whether the given position is valid (i.e. greater than the last 
+     * seen positions. If the position is invalid, this means that a call to
+     * {@link #addPosition(int)} with the same value would actually be a 
+     * no-operation.  
+     * @param pos
+     * @return
+     */
+    public boolean checkPosition(int pos){
+      return pos > lastPosition;
+    }
+    
+    /**
+     * Notifies this postings list that it has received all the data
+     */
+    public void flush() {
+      if(count > 0) {
+        // we have some new positions for the last document: they were already
+        // added to positions, but we now need to store their count
+        counts.add(count);
+        if(count > maxCount) maxCount = count;
+        sumMaxPos += positions.get(count - 1);
+        occurrences += count;
+      }
+      count = 0;
+    }
+    
+    /**
+     * Writes the data contained in this postings list to a index writer
+     * @param indexWriter
+     * @throws IOException 
+     */
+    public void write(IndexWriter indexWriter) throws IOException {
+      flush();
+      if(indexWriter instanceof QuasiSuccinctIndexWriter) {
+        ((QuasiSuccinctIndexWriter)indexWriter).newInvertedList(frequency,
+            occurrences, sumMaxPos);  
+      } else {
+        indexWriter.newInvertedList();
+      }
+      
+      indexWriter.writeFrequency(frequency);
+      long currDocumentPointer = firstDocumentPointer;
+      int positionsStart = 0;
+      
+      for(int docId = 0; docId < documentPointersDifferential.size(); docId++) 
{
+        currDocumentPointer += documentPointersDifferential.get(docId);
+        int currCount = counts.get(docId);
+        OutputBitStream obs = indexWriter.newDocumentRecord();
+        indexWriter.writeDocumentPointer(obs, currDocumentPointer);
+        indexWriter.writePositionCount(obs, currCount);
+StringBuilder str = new StringBuilder("Writing " + currCount + 
+    " positions from " + positionsStart + ": [");
+for(int i = positionsStart;  i < positionsStart + currCount; i++) {
+  str.append(positions.elements()[i]);
+  str.append(' ');
+}
+str.append("]");
+logger.info(str);
+
+        indexWriter.writeDocumentPositions(obs, positions.elements(),
+            positionsStart, currCount, -1);
+        
+        
+        positionsStart += currCount;
+      }
+    }
+  }
+  
+
   /**
    * The file name (under the current directory for this atomic index) which 
    * stores the principal index. 
@@ -63,9 +272,23 @@
    * directory containing the documents that have been queued for indexing, 
but 
    * not yet indexed. 
    */
-  public static final String DOCUMENTs_QUEUE_FILE_NAME = "queued-documents";
+  public static final String DOCUMENTS_QUEUE_FILE_NAME = "queued-documents";
   
   /**
+   * How many occurrences to index in each batch. This metric is more 
reliable, 
+   * than document counts, as it does not depend on average document size. 
+   */
+  public static final int DEFAULT_OCCURRENCES_PER_BATCH = 20 * 1000 * 1000;
+  
+  /** The initial size of the term map. */
+  private static final int INITIAL_TERM_MAP_SIZE = 1024;
+  
+  private static Logger logger = Logger.getLogger(AtomicIndex.class);
+  
+  protected static final PatternFilenameFilter TAILS_FILENAME_FILTER = 
+      new PatternFilenameFilter("\\Q" + TAIL_FILE_NAME_PREFIX + "\\E\\d+");
+  
+  /**
    * The name of this atomic index.
    */
   protected String name;
@@ -75,7 +298,7 @@
   /**
    * The number of occurrences stored in this index.
    */
-  protected long occurrences;
+  protected long totalOccurrences;
   
   /**
    * The number of occurrences represented in RAM and not yet written to disk. 
 
@@ -83,13 +306,99 @@
   protected long newOccurrences;
   
   /**
+   * How many occurrences to be accumulated in RAM before a new tail batch is
+   * written to disk.
+   */
+  protected long occurrencesPerBatch = DEFAULT_OCCURRENCES_PER_BATCH;
+  
+  /**
    * The {@link MimirIndex} that this atomic index is a member of.
    */
   protected MimirIndex parent;
   
   protected boolean hasDirectIndex;
   
-       /**
+  protected Thread indexingThread;
+  
+  /**
+   * Documents to be indexed are queued in this queue.
+   */
+  protected BlockingQueue<GATEDocument> inputQueue;
+  
+  /**
+   * Documents that have been indexed are passed on to this queue.
+   */
+  protected BlockingQueue<GATEDocument> outputQueue;
+
+    
+  /**
+   * The position of the current (or most-recently used) token in the current
+   * document.
+   */
+  protected int tokenPosition;
+  
+  /**
+   * A mutable string used to create instances of MutableString on the cheap.
+   */
+  protected MutableString currentTerm;
+  
+  /**
+   * The current document pointer (gets incremented for each document).
+   */
+  protected long documentPointer;
+  
+  /**
+   * An in-memory inverted index that gets dumped to files for each batch. 
+   */
+  protected Object2ReferenceOpenHashMap<MutableString, PostingsList> termMap;
+  
+  /**
+   * Creates a new AtomicIndex
+   * 
+   * @param parent the {@link MimirIndex} containing this atomic index.
+   * @param name the name of the sub-index, e.g. <em>token-i</em> or 
+   *  <em>mentions-j</em>
+   * @param indexDirectory the directory where this index should store all its 
+   *  files.
+   * @param hasDirectIndex should a direct index be used?
+   * @param inputQueue the input queue for documents to be indexed.
+   * @param outputQueue the output queue for documents that have been indexed.
+   */
+       protected AtomicIndex(MimirIndex parent, String name, File 
indexDirectory,
+      boolean hasDirectIndex, BlockingQueue<GATEDocument> inputQueue,
+      BlockingQueue<GATEDocument> outputQueue) {
+    super();
+    this.parent = parent;
+    this.name = name;
+    this.indexDirectory = indexDirectory;
+    this.hasDirectIndex = hasDirectIndex;
+    
+    this.inputQueue = inputQueue;
+    this.outputQueue = outputQueue;
+    
+    this.currentTerm = new MutableString();
+    
+    
+  }
+
+       protected void initIndex() {
+    if(indexDirectory.exists()) {
+      // opening an existing index
+      //TODO
+    } else {
+      // new index creation
+      indexDirectory.mkdirs();
+      
+      totalOccurrences = 0;
+      newOccurrences = 0;
+      documentPointer = 0;
+      
+      termMap = new Object2ReferenceOpenHashMap<MutableString, 
+          PostingsList>(INITIAL_TERM_MAP_SIZE, Hash.FAST_LOAD_FACTOR );
+    }    
+       }
+       
+  /**
         * Gets the name of this atomic index. This is used as the file name 
for the 
         * directory storing the index files.
         * @return
@@ -117,13 +426,81 @@
        
        /**
         * Writes all the data currently stored in RAM to a new tail index.
+        * @throws IOException 
         */
-       public void writeNewTail() {
-         //TODO 
-         // dump new tail
+       public void writeNewTail() throws IOException {
+         // find the name for the new tail
+         String[] existingTails = indexDirectory.list(TAILS_FILENAME_FILTER);
+         int tailNo = -1;
+         for(String aTail : existingTails) {
+           int aTailNo = 
Integer.parseInt(aTail.substring(TAIL_FILE_NAME_PREFIX.length()));
+           if(aTailNo > tailNo) tailNo = aTailNo;
+         }
+         tailNo++;
          
+         // Open an index writer for the new tail
+         String newTailName = TAIL_FILE_NAME_PREFIX + Integer.toString(tailNo);
+         File newTailDir = new File(indexDirectory, newTailName);
+         newTailDir.mkdir();
+         String mg4jBasename = new File(newTailDir, name).getAbsolutePath();
+         QuasiSuccinctIndexWriter indexWriter = new QuasiSuccinctIndexWriter(
+             IOFactory.FILESYSTEM_FACTORY,
+             mg4jBasename,
+             documentPointer,
+             Fast.mostSignificantBit(QuasiSuccinctIndex.DEFAULT_QUANTUM),
+             QuasiSuccinctIndexWriter.DEFAULT_CACHE_SIZE,
+             CompressionFlags.DEFAULT_QUASI_SUCCINCT_INDEX,
+             ByteOrder.nativeOrder());
+         // write the data from RAM
+    int numTerms = termMap.size();
+    logger.info( "Generating index for batch " + newTailName + 
+            "; documents: " + documentPointer + "; terms:" + numTerms + 
+            "; occurrences: " + newOccurrences );
+    
+    // We write down all term in appearance order in termArray.
+    final MutableString[] termArray = termMap.keySet().toArray(new 
MutableString[ numTerms ]);
+    // We sort the terms appearing in the batch and write them on disk.
+    Arrays.quickSort(0, termArray.length, 
+            new IntComparator() {
+              @Override
+              public int compare(Integer one, Integer other) {
+                return compare(one.intValue(), other.intValue());
+              }
+              
+              @Override
+              public int compare(int one, int other) {
+                return termArray[one].compareTo(termArray[other]);
+              }
+            },
+            new Swapper() {
+              @Override
+              public void swap(int one, int other) {
+                MutableString temp = termArray[one];
+                termArray[one] = termArray[other];
+                termArray[other] = temp;
+              }
+            });
+         // write the term map
+    PrintWriter pw = new PrintWriter( 
+        new OutputStreamWriter(new FastBufferedOutputStream(
+            new FileOutputStream(mg4jBasename + 
DiskBasedIndex.TERMS_EXTENSION), 
+            64 * 1024), 
+        "UTF-8" ));
+    for (MutableString t : termArray ) {
+      t.println( pw );
+    }
+    pw.close();
+    // write the actual index
+    int maxCount = 0;
+    for ( int i = 0; i < numTerms; i++ ) {
+      PostingsList postingsList = termMap.get( termArray[ i ] );
+      if ( maxCount < postingsList.maxCount ) maxCount = postingsList.maxCount;
+      postingsList.write(indexWriter);
+    }
+    
+    indexWriter.close();
          // merge new tail into index cluster
-         
+    
          if(hasDirectIndex) {
            // dump new direct tail (invert the tail just written)
            // merge new direct tail into direct index cluster
@@ -132,6 +509,8 @@
          // clear queued-documents folder
          
          newOccurrences = 0;
+         
+         termMap.clear();
        }
        
        
@@ -154,5 +533,173 @@
          // TODO
          return null;
        }
+
        
+       /**
+        * Runnable implementation: the logic of this run method is simply 
indexing
+        * documents queued to the input queue. To stop it, send a 
+        * {@link GATEDocument#END_OF_QUEUE} value to the input queue.
+        */
+       public void run() {
+         indexingThread = Thread.currentThread();
+         GATEDocument aDocument;
+         try{
+         initIndex();
+         if(inputQueue != null) {
+        while((aDocument = inputQueue.take()) != GATEDocument.END_OF_QUEUE){
+          try {
+            processDocument(aDocument);
+          } catch(Throwable e) {
+            logger.error("Problem while indexing document!", e);
+          }
+          //dump batch if needed AND there is data to dump
+          if (occurrencesPerBatch > 0 && newOccurrences > occurrencesPerBatch){
+            writeNewTail();
+          }
+          outputQueue.put(aDocument);
+        }
+        // we're done
+        writeNewTail();
+        flush();
+         }
+         }catch(InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } catch(Exception e) {
+      throw new GateRuntimeException("Exception during indexing!", e);
+    } finally {
+      indexingThread = null;
+    }
+       }
+       
+       /**
+        * Closes all file-based resources.
+        * @throws IOException
+        */
+       protected void flush() throws IOException {
+         
+       }
+       
+       /**
+        * Notifies this index to stop its indexing operations, and waits for 
all data
+        * to be written. 
+        * @throws InterruptedException is the waiting thread is interrupted 
before 
+        * the indexing thread has finished writing all the data.
+        */
+       public void close() throws InterruptedException {
+    inputQueue.put(GATEDocument.END_OF_QUEUE);
+    if(indexingThread != null) {
+      indexingThread.join();
+    }
+       }
+
+  /**
+   * Hook for subclasses, called before processing the annotations
+   * for this document.  The default implementation is a no-op.
+   */
+  protected void documentStarting(GATEDocument gateDocument) throws 
IndexException {
+  }
+
+  /**
+   * Hook for subclasses, called after annotations for this document
+   * have been processed.  The default implementation is a no-op.
+   */
+  protected void documentEnding(GATEDocument gateDocument) throws 
IndexException {
+  }
+       
+  /**
+   * Get the annotations that are to be processed for a document,
+   * in increasing order of offset.
+   */
+  protected abstract Annotation[] getAnnotsToProcess(
+          GATEDocument gateDocument) throws IndexException;
+  
+  
+  /**
+   * Calculate the starting position for the given annotation, storing
+   * it in {@link #tokenPosition}.  The starting position is the
+   * index of the token within the document where the annotation starts,
+   * and <em>must</em> be &gt;= the previous value of tokenPosition.
+   * @param ann
+   * @param gateDocument
+   */
+  protected abstract void calculateStartPositionForAnnotation(Annotation ann,
+          GATEDocument gateDocument) throws IndexException;
+  
+  /**
+   * Determine the string (or strings, if there are alternatives) that should 
+   * be stored in the index for the given annotation.
+   * 
+   * If a single string value should be returned, it is more efficient to store
+   * the value in {@link #currentTerm}, in which case <code>null</code> should 
+   * be returned instead.
+   * 
+   * If the current term should not be indexed (e.g. it's a stop word), then 
+   * the implementation should return an empty String array.
+   * 
+   * @param ann
+   * @param gateDocument
+   */
+  protected abstract String[] calculateTermStringForAnnotation(Annotation ann,
+          GATEDocument gateDocument) throws IndexException;
+  
+  protected void processDocument(GATEDocument gateDocument) throws 
IndexException{
+    //zero document related counters
+    tokenPosition = 0;
+    documentStarting(gateDocument);
+    //get the annotations to be processed
+    Annotation[] annotsToProcess = getAnnotsToProcess(gateDocument);
+    logger.debug("Starting document "
+        + gateDocument.getDocument().getName() + ". "
+        + annotsToProcess.length + " annotations to process");    
+    try {
+      //process the annotations one by one.
+      for(Annotation ann : annotsToProcess){
+        // calculate the position and string for this annotation
+        calculateStartPositionForAnnotation(ann, gateDocument);
+        String[] terms = calculateTermStringForAnnotation(ann, gateDocument);
+        if(terms == null){
+          //the value was already stored in #currentTerm by the implementation.
+          indexCurrentTerm();
+        }else if(terms.length == 0){
+          //we received an empty array -> we should NOT index the current term
+        }else{
+          //we have received multiple values from the implementation
+          for(String aTerm : terms){
+            currentTerm.replace(aTerm == null ? "" : aTerm);
+            indexCurrentTerm();
+          }
+        }        
+      }
+    } catch (IOException e) {
+      throw new IndexException("IO Exception while indexing", e);
+    }finally {
+      documentEnding(gateDocument);
+      documentPointer++;
+    }
+  }
+  
+  
+  /**
+   * Adds the value in {@link #currentTerm} to the index.
+   * @throws IOException 
+   */
+  protected void indexCurrentTerm() throws IOException {
+    //check if we have seen this mention before
+    PostingsList termPostings = termMap.get(currentTerm);
+    if(termPostings == null){
+      //new term -> create a new postings list.
+      termMap.put( currentTerm.copy(), termPostings = new PostingsList(true));
+    }
+    //add the current posting to the current postings list
+    termPostings.newDocumentPointer(documentPointer);
+    //this is needed so that we don't increment the number of occurrences
+    //for duplicate values.
+    if(termPostings.checkPosition(tokenPosition)){
+      termPostings.addPosition(tokenPosition);
+      newOccurrences++;
+    } else {
+      logger.debug("Duplicate position");
+    }
+  }
+  
 }

Added: mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicTokenIndex.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicTokenIndex.java    
                        (rev 0)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicTokenIndex.java    
2013-12-19 17:52:46 UTC (rev 17187)
@@ -0,0 +1,232 @@
+/*
+ *  AtomicTokenIndex.java
+ *
+ *  Copyright (c) 2007-2013, The University of Sheffield.
+ *
+ *  This file is part of GATE Mímir (see http://gate.ac.uk/family/mimir.html), 
+ *  and is free software, licenced under the GNU Lesser General Public License,
+ *  Version 3, June 2007 (also included with this distribution as file
+ *  LICENCE-LGPL3.html).
+ *
+ *  Valentin Tablan, 19 Dec 2013
+ *
+ *  $Id$
+ */
+package gate.mimir.index;
+
+import gate.Annotation;
+import gate.FeatureMap;
+import gate.mimir.DocumentMetadataHelper;
+import gate.mimir.MimirIndex;
+import gate.mimir.IndexConfig.TokenIndexerConfig;
+import gate.mimir.index.mg4j.GATEDocument;
+import gate.mimir.index.mg4j.GATEDocumentFactory;
+import gate.mimir.index.mg4j.zipcollection.DocumentCollectionWriter;
+import gate.mimir.index.mg4j.zipcollection.DocumentData;
+import it.unimi.di.big.mg4j.index.TermProcessor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.log4j.Logger;
+
+/**
+ * An {@link AtomicIndex} implementation for indexing tokens.
+ */
+public class AtomicTokenIndex extends AtomicIndex {
+  
+  private final static Logger logger = 
Logger.getLogger(AtomicTokenIndex.class);
+  
+  /**
+   * A constant (empty String array) used for filtering terms from indexing.
+   * @see #calculateTermStringForAnnotation(Annotation, GATEDocument)
+   * implementation.
+   */
+  private static final String[] DO_NOT_INDEX = new String[]{};
+  
+  /**
+   * A zip collection builder used to build a zip of the collection
+   * if this has been requested.
+   */
+  protected DocumentCollectionWriter collectionWriter = null;
+  
+  /**
+   * An array of helpers for creating document metadata. 
+   */
+  protected DocumentMetadataHelper[] docMetadataHelpers;
+  
+  /**
+   * Stores the document URI for writing to the zip collection;
+   */
+  protected String documentURI;
+  
+  /**
+   * Stores the document title for writing to the zip collection. 
+   */
+  protected String documentTitle;
+  
+  /**
+   * Stores the document tokens for writing to the zip collection;
+   */
+  protected List<String> documentTokens;
+  
+  /**
+   * Stores the document non-tokens for writing to the zip collection;
+   */
+  protected List<String> documentNonTokens;
+  
+  
+  /**
+   * GATE document factory used by the zip builder, and also to
+   * translate field indexes to field names.
+   */
+  protected GATEDocumentFactory factory;
+  
+  
+  /**
+   * The feature name corresponding to the field.
+   */
+  protected String featureName;
+  
+  
+  /**
+   * The term processor used to process the feature values being indexed.
+   */
+  protected TermProcessor termProcessor;
+  
+  /**
+   * @param parent
+   * @param name
+   * @param indexDirectory
+   * @param hasDirectIndex
+   * @param inputQueue
+   * @param outputQueue
+   */
+  public AtomicTokenIndex(MimirIndex parent, String name, File indexDirectory,
+      boolean hasDirectIndex, BlockingQueue<GATEDocument> inputQueue,
+      BlockingQueue<GATEDocument> outputQueue, TokenIndexerConfig config,
+      boolean zipCollection) {
+    super(parent, name, indexDirectory, hasDirectIndex, inputQueue, 
outputQueue);
+    this.featureName = config.getFeatureName();
+    this.termProcessor = config.getTermProcessor();
+    
+    if(zipCollection) {
+      logger.info("Creating zipped collection for field \"" + name + "\"");
+      collectionWriter = new DocumentCollectionWriter(indexDirectory);
+    }
+    
+    indexingThread = new Thread(this, "Mimir-" + name + " indexing thread");
+    indexingThread.start();
+  }
+
+  
+  /**
+   * If zipping, inform the collection builder that a new document
+   * is about to start.
+   */
+  protected void documentStarting(GATEDocument gateDocument) throws 
IndexException {
+    if(collectionWriter != null) {
+      documentURI = gateDocument.uri().toString();
+      documentTitle = gateDocument.title().toString();
+      documentTokens = new LinkedList<String>();
+      documentNonTokens = new LinkedList<String>();
+      if(docMetadataHelpers != null){
+        for(DocumentMetadataHelper aHelper : docMetadataHelpers){
+          aHelper.documentStart(gateDocument);
+        }
+      }
+    }
+    // set lastTokenIndex to -1 so we don't have to special-case the first
+    // token in the document in calculateStartPosition
+    tokenPosition = -1;
+  }
+
+  /**
+   * If zipping, inform the collection builder that we finished
+   * the current document.
+   */
+  protected void documentEnding(GATEDocument gateDocument) throws 
IndexException {
+    if(collectionWriter != null) {
+      DocumentData docData = new DocumentData(documentURI, 
+              documentTitle, 
+              documentTokens.toArray(new String[documentTokens.size()]),
+              documentNonTokens.toArray(new 
String[documentNonTokens.size()])); 
+      if(docMetadataHelpers != null){
+        for(DocumentMetadataHelper aHelper : docMetadataHelpers){
+          aHelper.documentEnd(gateDocument, docData);
+        }
+      }
+      collectionWriter.writeDocument(docData);
+      documentTokens = null;
+      documentNonTokens = null;
+    }
+  }
+
+  /**
+   * Get the token annotations from this document, in increasing
+   * order of offset.
+   */
+  protected Annotation[] getAnnotsToProcess(GATEDocument gateDocument) {
+    return gateDocument.getTokenAnnots();
+  }
+
+  /**
+   * This indexer always adds one posting per token, so the start
+   * position for the next annotation is always one more than the
+   * previous one.
+   * 
+   * @param ann
+   * @param gateDocument
+   */
+  protected void calculateStartPositionForAnnotation(Annotation ann,
+          GATEDocument gateDocument) {
+    tokenPosition++;
+  }
+
+  /**
+   * For a token annotation, the "string" we index is the feature value
+   * corresponding to the name of the field to index.  As well as
+   * calculating the string, this method writes an entry to the zip
+   * collection builder if it exists.
+   * 
+   * @param ann
+   * @param gateDocument
+   */
+  protected String[] calculateTermStringForAnnotation(Annotation ann,
+          GATEDocument gateDocument) throws IndexException {
+    FeatureMap tokenFeatures = ann.getFeatures();
+    String value = (String)tokenFeatures.get(featureName);
+    currentTerm.replace(value == null ? "" : value);
+    //save the *unprocessed* term to the collection, if required.
+    if(collectionWriter != null) {
+      documentTokens.add(currentTerm.toString());
+      documentNonTokens.add(gateDocument.getNonTokens()[tokenPosition]);
+    }
+    if(termProcessor.processTerm(currentTerm)){
+      //the processor has changed the term, and allowed us to index it
+      return null;  
+    }else{
+      //the processor has filtered the term -> don't index it.
+      return DO_NOT_INDEX;
+    }
+    
+  }
+
+  /**
+   * Overridden to close the zip collection builder.
+   */
+  @Override
+  protected void flush() throws IOException {
+    if(collectionWriter != null) {
+      logger.info("Saving zipped collection");
+      collectionWriter.close();
+    }
+    super.flush();
+  }  
+  
+
+}


Property changes on: 
mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicTokenIndex.java
___________________________________________________________________
Added: svn:keywords
## -0,0 +1 ##
+Id
\ No newline at end of property
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
This was sent by the SourceForge.net collaborative development platform, the 
world's largest Open Source development site.


------------------------------------------------------------------------------
Rapidly troubleshoot problems before they affect your business. Most IT 
organizations don't have a clear picture of how application performance 
affects their revenue. With AppDynamics, you get 100% visibility into your 
Java,.NET, & PHP application. Start your 15-day FREE TRIAL of AppDynamics Pro!
http://pubads.g.doubleclick.net/gampad/clk?id=84349831&iu=/4140/ostg.clktrk
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs

Reply via email to