Revision: 17187
http://sourceforge.net/p/gate/code/17187
Author: valyt
Date: 2013-12-19 17:52:46 +0000 (Thu, 19 Dec 2013)
Log Message:
-----------
More work on the new live-index framework:
- we now use the MG4J API for writing out indexes
- base indexer functionality for both tokens and mentions
- started work on the new token indexer
Modified Paths:
--------------
mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
Added Paths:
-----------
mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicTokenIndex.java
Modified: mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
2013-12-18 16:03:06 UTC (rev 17186)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
2013-12-19 17:52:46 UTC (rev 17187)
@@ -15,6 +15,7 @@
package gate.mimir;
import gate.mimir.index.AtomicIndex;
+import gate.mimir.index.AtomicTokenIndex;
import gate.mimir.index.Indexer;
import gate.mimir.search.QueryEngine;
@@ -58,7 +59,7 @@
// TODO
}
- protected AtomicIndex[] tokenIndexes;
+ protected AtomicTokenIndex[] tokenIndexes;
protected AtomicIndex[] mentionIndexes;
Modified: mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
2013-12-18 16:03:06 UTC (rev 17186)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
2013-12-19 17:52:46 UTC (rev 17187)
@@ -14,12 +14,42 @@
*/
package gate.mimir.index;
+import gate.Annotation;
import gate.Document;
import gate.mimir.MimirIndex;
+import gate.mimir.index.mg4j.GATEDocument;
+import gate.util.GateRuntimeException;
+import it.unimi.di.big.mg4j.index.CompressionFlags;
+import it.unimi.di.big.mg4j.index.DiskBasedIndex;
import it.unimi.di.big.mg4j.index.IndexReader;
+import it.unimi.di.big.mg4j.index.IndexWriter;
+import it.unimi.di.big.mg4j.index.QuasiSuccinctIndex;
+import it.unimi.di.big.mg4j.index.QuasiSuccinctIndexWriter;
+import it.unimi.di.big.mg4j.io.IOFactory;
+import it.unimi.dsi.bits.Fast;
+import it.unimi.dsi.fastutil.Arrays;
+import it.unimi.dsi.fastutil.Hash;
+import it.unimi.dsi.fastutil.Swapper;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntComparator;
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
+import it.unimi.dsi.fastutil.objects.Object2ReferenceOpenHashMap;
+import it.unimi.dsi.io.OutputBitStream;
+import it.unimi.dsi.lang.MutableString;
import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.ByteOrder;
+import java.util.concurrent.BlockingQueue;
+import org.apache.log4j.Logger;
+
+import com.google.common.io.PatternFilenameFilter;
+
/**
* An indirect index associating terms with documents. Terms can be either
token
* feature values, or semantic annotation URIs. Optionally, a direct index may
@@ -30,8 +60,187 @@
* all the new documents that have been queued for indexing since the last tail
* was written are stored in RAM.
*/
-public abstract class AtomicIndex {
+public abstract class AtomicIndex implements Runnable {
+
+ /**
+ * An in-RAM representation of a postings list
+ */
+ protected static class PostingsList {
+
+ /**
+ * The first document pointer added to this postings list.
+ */
+ private long firstDocumentPointer = -1;
+
+ /**
+ * The last seen document pointer.
+ */
+ private long lastDocumentPointer = -1;
+
+ /**
+ * The list of document pointer differentials (differences from
+ * {@link #firstDocumentPointer}). For the sake of easy alignment, we
+ * actaully store a <tt>0</tt> on the first position.
+ */
+ private IntList documentPointersDifferential;
+
+ /**
+ * The list of counts for each document. This list is aligned with
+ * {@link #documentPointersDifferential}.
+ */
+ private IntList counts;
+
+ /**
+ * The list of positions in this postings list. For each document at
position
+ * <tt>i</i>, there will be counts[i] positions stored in this list.
+ */
+ private IntArrayList positions;
+
+ /**
+ * The last seen position.
+ */
+ private int lastPosition = -1;
+
+ /**
+ * The number of position in the current document
+ */
+ private int count = 0;
+
+ /**
+ * The maximum count of all the stored documents
+ */
+ private int maxCount = 0;
+
+ /**
+ * The number of document pointers contained
+ */
+ private long frequency = 0;
+
+ /**
+ * The total number of occurrences stored.
+ */
+ private long occurrences = 0;
+ /**
+ * The sum of the maximum positions for each document.
+ */
+ private long sumMaxPos = 0;
+
+ public PostingsList(boolean storePositions) {
+ firstDocumentPointer = -1;
+ documentPointersDifferential = new IntArrayList();
+ counts = new IntArrayList();
+ if(storePositions) {
+ positions = new IntArrayList();
+ }
+ }
+
+ /**
+ * Start storing the data for a new document
+ * @param pointer
+ */
+ public void newDocumentPointer(long pointer) {
+ // is this really a new document
+ if(pointer != lastDocumentPointer) {
+ if(firstDocumentPointer < 0) firstDocumentPointer = pointer;
+ if(lastDocumentPointer == -1) {
+ // this is the first document
+ documentPointersDifferential.add(0);
+ } else {
+ // close previous document
+ flush();
+ // add the new document
+ documentPointersDifferential.add((int)(pointer -
lastDocumentPointer));
+ }
+ lastDocumentPointer = pointer;
+ // reset the lastPosition when moving to a new document
+ lastPosition = -1;
+
+ frequency++;
+ }
+ }
+
+ public void addPosition(int pos) {
+ // ignore if the position hasn't changed: we don't store two identical
+ // records
+ if(pos != lastPosition) {
+ positions.add(pos);
+ count++;
+ //and update lastPosition
+ lastPosition = pos;
+ }
+ }
+
+ /**
+ * Checks whether the given position is valid (i.e. greater than the last
+ * seen positions. If the position is invalid, this means that a call to
+ * {@link #addPosition(int)} with the same value would actually be a
+ * no-operation.
+ * @param pos
+ * @return
+ */
+ public boolean checkPosition(int pos){
+ return pos > lastPosition;
+ }
+
+ /**
+ * Notifies this postings list that it has received all the data
+ */
+ public void flush() {
+ if(count > 0) {
+ // we have some new positions for the last document: they were already
+ // added to positions, but we now need to store their count
+ counts.add(count);
+ if(count > maxCount) maxCount = count;
+ sumMaxPos += positions.get(count - 1);
+ occurrences += count;
+ }
+ count = 0;
+ }
+
+ /**
+ * Writes the data contained in this postings list to a index writer
+ * @param indexWriter
+ * @throws IOException
+ */
+ public void write(IndexWriter indexWriter) throws IOException {
+ flush();
+ if(indexWriter instanceof QuasiSuccinctIndexWriter) {
+ ((QuasiSuccinctIndexWriter)indexWriter).newInvertedList(frequency,
+ occurrences, sumMaxPos);
+ } else {
+ indexWriter.newInvertedList();
+ }
+
+ indexWriter.writeFrequency(frequency);
+ long currDocumentPointer = firstDocumentPointer;
+ int positionsStart = 0;
+
+ for(int docId = 0; docId < documentPointersDifferential.size(); docId++)
{
+ currDocumentPointer += documentPointersDifferential.get(docId);
+ int currCount = counts.get(docId);
+ OutputBitStream obs = indexWriter.newDocumentRecord();
+ indexWriter.writeDocumentPointer(obs, currDocumentPointer);
+ indexWriter.writePositionCount(obs, currCount);
+StringBuilder str = new StringBuilder("Writing " + currCount +
+ " positions from " + positionsStart + ": [");
+for(int i = positionsStart; i < positionsStart + currCount; i++) {
+ str.append(positions.elements()[i]);
+ str.append(' ');
+}
+str.append("]");
+logger.info(str);
+
+ indexWriter.writeDocumentPositions(obs, positions.elements(),
+ positionsStart, currCount, -1);
+
+
+ positionsStart += currCount;
+ }
+ }
+ }
+
+
/**
* The file name (under the current directory for this atomic index) which
* stores the principal index.
@@ -63,9 +272,23 @@
* directory containing the documents that have been queued for indexing,
but
* not yet indexed.
*/
- public static final String DOCUMENTs_QUEUE_FILE_NAME = "queued-documents";
+ public static final String DOCUMENTS_QUEUE_FILE_NAME = "queued-documents";
/**
+ * How many occurrences to index in each batch. This metric is more
reliable,
+ * than document counts, as it does not depend on average document size.
+ */
+ public static final int DEFAULT_OCCURRENCES_PER_BATCH = 20 * 1000 * 1000;
+
+ /** The initial size of the term map. */
+ private static final int INITIAL_TERM_MAP_SIZE = 1024;
+
+ private static Logger logger = Logger.getLogger(AtomicIndex.class);
+
+ protected static final PatternFilenameFilter TAILS_FILENAME_FILTER =
+ new PatternFilenameFilter("\\Q" + TAIL_FILE_NAME_PREFIX + "\\E\\d+");
+
+ /**
* The name of this atomic index.
*/
protected String name;
@@ -75,7 +298,7 @@
/**
* The number of occurrences stored in this index.
*/
- protected long occurrences;
+ protected long totalOccurrences;
/**
* The number of occurrences represented in RAM and not yet written to disk.
@@ -83,13 +306,99 @@
protected long newOccurrences;
/**
+ * How many occurrences to be accumulated in RAM before a new tail batch is
+ * written to disk.
+ */
+ protected long occurrencesPerBatch = DEFAULT_OCCURRENCES_PER_BATCH;
+
+ /**
* The {@link MimirIndex} that this atomic index is a member of.
*/
protected MimirIndex parent;
protected boolean hasDirectIndex;
- /**
+ protected Thread indexingThread;
+
+ /**
+ * Documents to be indexed are queued in this queue.
+ */
+ protected BlockingQueue<GATEDocument> inputQueue;
+
+ /**
+ * Documents that have been indexed are passed on to this queue.
+ */
+ protected BlockingQueue<GATEDocument> outputQueue;
+
+
+ /**
+ * The position of the current (or most-recently used) token in the current
+ * document.
+ */
+ protected int tokenPosition;
+
+ /**
+ * A mutable string used to create instances of MutableString on the cheap.
+ */
+ protected MutableString currentTerm;
+
+ /**
+ * The current document pointer (gets incremented for each document).
+ */
+ protected long documentPointer;
+
+ /**
+ * An in-memory inverted index that gets dumped to files for each batch.
+ */
+ protected Object2ReferenceOpenHashMap<MutableString, PostingsList> termMap;
+
+ /**
+ * Creates a new AtomicIndex
+ *
+ * @param parent the {@link MimirIndex} containing this atomic index.
+ * @param name the name of the sub-index, e.g. <em>token-i</em> or
+ * <em>mentions-j</em>
+ * @param indexDirectory the directory where this index should store all its
+ * files.
+ * @param hasDirectIndex should a direct index be used?
+ * @param inputQueue the input queue for documents to be indexed.
+ * @param outputQueue the output queue for documents that have been indexed.
+ */
+ protected AtomicIndex(MimirIndex parent, String name, File
indexDirectory,
+ boolean hasDirectIndex, BlockingQueue<GATEDocument> inputQueue,
+ BlockingQueue<GATEDocument> outputQueue) {
+ super();
+ this.parent = parent;
+ this.name = name;
+ this.indexDirectory = indexDirectory;
+ this.hasDirectIndex = hasDirectIndex;
+
+ this.inputQueue = inputQueue;
+ this.outputQueue = outputQueue;
+
+ this.currentTerm = new MutableString();
+
+
+ }
+
+ protected void initIndex() {
+ if(indexDirectory.exists()) {
+ // opening an existing index
+ //TODO
+ } else {
+ // new index creation
+ indexDirectory.mkdirs();
+
+ totalOccurrences = 0;
+ newOccurrences = 0;
+ documentPointer = 0;
+
+ termMap = new Object2ReferenceOpenHashMap<MutableString,
+ PostingsList>(INITIAL_TERM_MAP_SIZE, Hash.FAST_LOAD_FACTOR );
+ }
+ }
+
+ /**
* Gets the name of this atomic index. This is used as the file name
for the
* directory storing the index files.
* @return
@@ -117,13 +426,81 @@
/**
* Writes all the data currently stored in RAM to a new tail index.
+ * @throws IOException
*/
- public void writeNewTail() {
- //TODO
- // dump new tail
+ public void writeNewTail() throws IOException {
+ // find the name for the new tail
+ String[] existingTails = indexDirectory.list(TAILS_FILENAME_FILTER);
+ int tailNo = -1;
+ for(String aTail : existingTails) {
+ int aTailNo =
Integer.parseInt(aTail.substring(TAIL_FILE_NAME_PREFIX.length()));
+ if(aTailNo > tailNo) tailNo = aTailNo;
+ }
+ tailNo++;
+ // Open an index writer for the new tail
+ String newTailName = TAIL_FILE_NAME_PREFIX + Integer.toString(tailNo);
+ File newTailDir = new File(indexDirectory, newTailName);
+ newTailDir.mkdir();
+ String mg4jBasename = new File(newTailDir, name).getAbsolutePath();
+ QuasiSuccinctIndexWriter indexWriter = new QuasiSuccinctIndexWriter(
+ IOFactory.FILESYSTEM_FACTORY,
+ mg4jBasename,
+ documentPointer,
+ Fast.mostSignificantBit(QuasiSuccinctIndex.DEFAULT_QUANTUM),
+ QuasiSuccinctIndexWriter.DEFAULT_CACHE_SIZE,
+ CompressionFlags.DEFAULT_QUASI_SUCCINCT_INDEX,
+ ByteOrder.nativeOrder());
+ // write the data from RAM
+ int numTerms = termMap.size();
+ logger.info( "Generating index for batch " + newTailName +
+ "; documents: " + documentPointer + "; terms:" + numTerms +
+ "; occurrences: " + newOccurrences );
+
+ // We write down all term in appearance order in termArray.
+ final MutableString[] termArray = termMap.keySet().toArray(new
MutableString[ numTerms ]);
+ // We sort the terms appearing in the batch and write them on disk.
+ Arrays.quickSort(0, termArray.length,
+ new IntComparator() {
+ @Override
+ public int compare(Integer one, Integer other) {
+ return compare(one.intValue(), other.intValue());
+ }
+
+ @Override
+ public int compare(int one, int other) {
+ return termArray[one].compareTo(termArray[other]);
+ }
+ },
+ new Swapper() {
+ @Override
+ public void swap(int one, int other) {
+ MutableString temp = termArray[one];
+ termArray[one] = termArray[other];
+ termArray[other] = temp;
+ }
+ });
+ // write the term map
+ PrintWriter pw = new PrintWriter(
+ new OutputStreamWriter(new FastBufferedOutputStream(
+ new FileOutputStream(mg4jBasename +
DiskBasedIndex.TERMS_EXTENSION),
+ 64 * 1024),
+ "UTF-8" ));
+ for (MutableString t : termArray ) {
+ t.println( pw );
+ }
+ pw.close();
+ // write the actual index
+ int maxCount = 0;
+ for ( int i = 0; i < numTerms; i++ ) {
+ PostingsList postingsList = termMap.get( termArray[ i ] );
+ if ( maxCount < postingsList.maxCount ) maxCount = postingsList.maxCount;
+ postingsList.write(indexWriter);
+ }
+
+ indexWriter.close();
// merge new tail into index cluster
-
+
if(hasDirectIndex) {
// dump new direct tail (invert the tail just written)
// merge new direct tail into direct index cluster
@@ -132,6 +509,8 @@
// clear queued-documents folder
newOccurrences = 0;
+
+ termMap.clear();
}
@@ -154,5 +533,173 @@
// TODO
return null;
}
+
+ /**
+ * Runnable implementation: the logic of this run method is simply
indexing
+ * documents queued to the input queue. To stop it, send a
+ * {@link GATEDocument#END_OF_QUEUE} value to the input queue.
+ */
+ public void run() {
+ indexingThread = Thread.currentThread();
+ GATEDocument aDocument;
+ try{
+ initIndex();
+ if(inputQueue != null) {
+ while((aDocument = inputQueue.take()) != GATEDocument.END_OF_QUEUE){
+ try {
+ processDocument(aDocument);
+ } catch(Throwable e) {
+ logger.error("Problem while indexing document!", e);
+ }
+ //dump batch if needed AND there is data to dump
+ if (occurrencesPerBatch > 0 && newOccurrences > occurrencesPerBatch){
+ writeNewTail();
+ }
+ outputQueue.put(aDocument);
+ }
+ // we're done
+ writeNewTail();
+ flush();
+ }
+ }catch(InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch(Exception e) {
+ throw new GateRuntimeException("Exception during indexing!", e);
+ } finally {
+ indexingThread = null;
+ }
+ }
+
+ /**
+ * Closes all file-based resources.
+ * @throws IOException
+ */
+ protected void flush() throws IOException {
+
+ }
+
+ /**
+ * Notifies this index to stop its indexing operations, and waits for
all data
+ * to be written.
+ * @throws InterruptedException is the waiting thread is interrupted
before
+ * the indexing thread has finished writing all the data.
+ */
+ public void close() throws InterruptedException {
+ inputQueue.put(GATEDocument.END_OF_QUEUE);
+ if(indexingThread != null) {
+ indexingThread.join();
+ }
+ }
+
+ /**
+ * Hook for subclasses, called before processing the annotations
+ * for this document. The default implementation is a no-op.
+ */
+ protected void documentStarting(GATEDocument gateDocument) throws
IndexException {
+ }
+
+ /**
+ * Hook for subclasses, called after annotations for this document
+ * have been processed. The default implementation is a no-op.
+ */
+ protected void documentEnding(GATEDocument gateDocument) throws
IndexException {
+ }
+
+ /**
+ * Get the annotations that are to be processed for a document,
+ * in increasing order of offset.
+ */
+ protected abstract Annotation[] getAnnotsToProcess(
+ GATEDocument gateDocument) throws IndexException;
+
+
+ /**
+ * Calculate the starting position for the given annotation, storing
+ * it in {@link #tokenPosition}. The starting position is the
+ * index of the token within the document where the annotation starts,
+ * and <em>must</em> be >= the previous value of tokenPosition.
+ * @param ann
+ * @param gateDocument
+ */
+ protected abstract void calculateStartPositionForAnnotation(Annotation ann,
+ GATEDocument gateDocument) throws IndexException;
+
+ /**
+ * Determine the string (or strings, if there are alternatives) that should
+ * be stored in the index for the given annotation.
+ *
+ * If a single string value should be returned, it is more efficient to store
+ * the value in {@link #currentTerm}, in which case <code>null</code> should
+ * be returned instead.
+ *
+ * If the current term should not be indexed (e.g. it's a stop word), then
+ * the implementation should return an empty String array.
+ *
+ * @param ann
+ * @param gateDocument
+ */
+ protected abstract String[] calculateTermStringForAnnotation(Annotation ann,
+ GATEDocument gateDocument) throws IndexException;
+
+ protected void processDocument(GATEDocument gateDocument) throws
IndexException{
+ //zero document related counters
+ tokenPosition = 0;
+ documentStarting(gateDocument);
+ //get the annotations to be processed
+ Annotation[] annotsToProcess = getAnnotsToProcess(gateDocument);
+ logger.debug("Starting document "
+ + gateDocument.getDocument().getName() + ". "
+ + annotsToProcess.length + " annotations to process");
+ try {
+ //process the annotations one by one.
+ for(Annotation ann : annotsToProcess){
+ // calculate the position and string for this annotation
+ calculateStartPositionForAnnotation(ann, gateDocument);
+ String[] terms = calculateTermStringForAnnotation(ann, gateDocument);
+ if(terms == null){
+ //the value was already stored in #currentTerm by the implementation.
+ indexCurrentTerm();
+ }else if(terms.length == 0){
+ //we received an empty array -> we should NOT index the current term
+ }else{
+ //we have received multiple values from the implementation
+ for(String aTerm : terms){
+ currentTerm.replace(aTerm == null ? "" : aTerm);
+ indexCurrentTerm();
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new IndexException("IO Exception while indexing", e);
+ }finally {
+ documentEnding(gateDocument);
+ documentPointer++;
+ }
+ }
+
+
+ /**
+ * Adds the value in {@link #currentTerm} to the index.
+ * @throws IOException
+ */
+ protected void indexCurrentTerm() throws IOException {
+ //check if we have seen this mention before
+ PostingsList termPostings = termMap.get(currentTerm);
+ if(termPostings == null){
+ //new term -> create a new postings list.
+ termMap.put( currentTerm.copy(), termPostings = new PostingsList(true));
+ }
+ //add the current posting to the current postings list
+ termPostings.newDocumentPointer(documentPointer);
+ //this is needed so that we don't increment the number of occurrences
+ //for duplicate values.
+ if(termPostings.checkPosition(tokenPosition)){
+ termPostings.addPosition(tokenPosition);
+ newOccurrences++;
+ } else {
+ logger.debug("Duplicate position");
+ }
+ }
+
}
Added: mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicTokenIndex.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicTokenIndex.java
(rev 0)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicTokenIndex.java
2013-12-19 17:52:46 UTC (rev 17187)
@@ -0,0 +1,232 @@
+/*
+ * AtomicTokenIndex.java
+ *
+ * Copyright (c) 2007-2013, The University of Sheffield.
+ *
+ * This file is part of GATE MÃmir (see http://gate.ac.uk/family/mimir.html),
+ * and is free software, licenced under the GNU Lesser General Public License,
+ * Version 3, June 2007 (also included with this distribution as file
+ * LICENCE-LGPL3.html).
+ *
+ * Valentin Tablan, 19 Dec 2013
+ *
+ * $Id$
+ */
+package gate.mimir.index;
+
+import gate.Annotation;
+import gate.FeatureMap;
+import gate.mimir.DocumentMetadataHelper;
+import gate.mimir.MimirIndex;
+import gate.mimir.IndexConfig.TokenIndexerConfig;
+import gate.mimir.index.mg4j.GATEDocument;
+import gate.mimir.index.mg4j.GATEDocumentFactory;
+import gate.mimir.index.mg4j.zipcollection.DocumentCollectionWriter;
+import gate.mimir.index.mg4j.zipcollection.DocumentData;
+import it.unimi.di.big.mg4j.index.TermProcessor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.log4j.Logger;
+
+/**
+ * An {@link AtomicIndex} implementation for indexing tokens.
+ */
+public class AtomicTokenIndex extends AtomicIndex {
+
+ private final static Logger logger =
Logger.getLogger(AtomicTokenIndex.class);
+
+ /**
+ * A constant (empty String array) used for filtering terms from indexing.
+ * @see #calculateTermStringForAnnotation(Annotation, GATEDocument)
+ * implementation.
+ */
+ private static final String[] DO_NOT_INDEX = new String[]{};
+
+ /**
+ * A zip collection builder used to build a zip of the collection
+ * if this has been requested.
+ */
+ protected DocumentCollectionWriter collectionWriter = null;
+
+ /**
+ * An array of helpers for creating document metadata.
+ */
+ protected DocumentMetadataHelper[] docMetadataHelpers;
+
+ /**
+ * Stores the document URI for writing to the zip collection;
+ */
+ protected String documentURI;
+
+ /**
+ * Stores the document title for writing to the zip collection.
+ */
+ protected String documentTitle;
+
+ /**
+ * Stores the document tokens for writing to the zip collection;
+ */
+ protected List<String> documentTokens;
+
+ /**
+ * Stores the document non-tokens for writing to the zip collection;
+ */
+ protected List<String> documentNonTokens;
+
+
+ /**
+ * GATE document factory used by the zip builder, and also to
+ * translate field indexes to field names.
+ */
+ protected GATEDocumentFactory factory;
+
+
+ /**
+ * The feature name corresponding to the field.
+ */
+ protected String featureName;
+
+
+ /**
+ * The term processor used to process the feature values being indexed.
+ */
+ protected TermProcessor termProcessor;
+
+ /**
+ * @param parent
+ * @param name
+ * @param indexDirectory
+ * @param hasDirectIndex
+ * @param inputQueue
+ * @param outputQueue
+ */
+ public AtomicTokenIndex(MimirIndex parent, String name, File indexDirectory,
+ boolean hasDirectIndex, BlockingQueue<GATEDocument> inputQueue,
+ BlockingQueue<GATEDocument> outputQueue, TokenIndexerConfig config,
+ boolean zipCollection) {
+ super(parent, name, indexDirectory, hasDirectIndex, inputQueue,
outputQueue);
+ this.featureName = config.getFeatureName();
+ this.termProcessor = config.getTermProcessor();
+
+ if(zipCollection) {
+ logger.info("Creating zipped collection for field \"" + name + "\"");
+ collectionWriter = new DocumentCollectionWriter(indexDirectory);
+ }
+
+ indexingThread = new Thread(this, "Mimir-" + name + " indexing thread");
+ indexingThread.start();
+ }
+
+
+ /**
+ * If zipping, inform the collection builder that a new document
+ * is about to start.
+ */
+ protected void documentStarting(GATEDocument gateDocument) throws
IndexException {
+ if(collectionWriter != null) {
+ documentURI = gateDocument.uri().toString();
+ documentTitle = gateDocument.title().toString();
+ documentTokens = new LinkedList<String>();
+ documentNonTokens = new LinkedList<String>();
+ if(docMetadataHelpers != null){
+ for(DocumentMetadataHelper aHelper : docMetadataHelpers){
+ aHelper.documentStart(gateDocument);
+ }
+ }
+ }
+ // set lastTokenIndex to -1 so we don't have to special-case the first
+ // token in the document in calculateStartPosition
+ tokenPosition = -1;
+ }
+
+ /**
+ * If zipping, inform the collection builder that we finished
+ * the current document.
+ */
+ protected void documentEnding(GATEDocument gateDocument) throws
IndexException {
+ if(collectionWriter != null) {
+ DocumentData docData = new DocumentData(documentURI,
+ documentTitle,
+ documentTokens.toArray(new String[documentTokens.size()]),
+ documentNonTokens.toArray(new
String[documentNonTokens.size()]));
+ if(docMetadataHelpers != null){
+ for(DocumentMetadataHelper aHelper : docMetadataHelpers){
+ aHelper.documentEnd(gateDocument, docData);
+ }
+ }
+ collectionWriter.writeDocument(docData);
+ documentTokens = null;
+ documentNonTokens = null;
+ }
+ }
+
+ /**
+ * Get the token annotations from this document, in increasing
+ * order of offset.
+ */
+ protected Annotation[] getAnnotsToProcess(GATEDocument gateDocument) {
+ return gateDocument.getTokenAnnots();
+ }
+
+ /**
+ * This indexer always adds one posting per token, so the start
+ * position for the next annotation is always one more than the
+ * previous one.
+ *
+ * @param ann
+ * @param gateDocument
+ */
+ protected void calculateStartPositionForAnnotation(Annotation ann,
+ GATEDocument gateDocument) {
+ tokenPosition++;
+ }
+
+ /**
+ * For a token annotation, the "string" we index is the feature value
+ * corresponding to the name of the field to index. As well as
+ * calculating the string, this method writes an entry to the zip
+ * collection builder if it exists.
+ *
+ * @param ann
+ * @param gateDocument
+ */
+ protected String[] calculateTermStringForAnnotation(Annotation ann,
+ GATEDocument gateDocument) throws IndexException {
+ FeatureMap tokenFeatures = ann.getFeatures();
+ String value = (String)tokenFeatures.get(featureName);
+ currentTerm.replace(value == null ? "" : value);
+ //save the *unprocessed* term to the collection, if required.
+ if(collectionWriter != null) {
+ documentTokens.add(currentTerm.toString());
+ documentNonTokens.add(gateDocument.getNonTokens()[tokenPosition]);
+ }
+ if(termProcessor.processTerm(currentTerm)){
+ //the processor has changed the term, and allowed us to index it
+ return null;
+ }else{
+ //the processor has filtered the term -> don't index it.
+ return DO_NOT_INDEX;
+ }
+
+ }
+
+ /**
+ * Overridden to close the zip collection builder.
+ */
+ @Override
+ protected void flush() throws IOException {
+ if(collectionWriter != null) {
+ logger.info("Saving zipped collection");
+ collectionWriter.close();
+ }
+ super.flush();
+ }
+
+
+}
Property changes on:
mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicTokenIndex.java
___________________________________________________________________
Added: svn:keywords
## -0,0 +1 ##
+Id
\ No newline at end of property
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
This was sent by the SourceForge.net collaborative development platform, the
world's largest Open Source development site.
------------------------------------------------------------------------------
Rapidly troubleshoot problems before they affect your business. Most IT
organizations don't have a clear picture of how application performance
affects their revenue. With AppDynamics, you get 100% visibility into your
Java,.NET, & PHP application. Start your 15-day FREE TRIAL of AppDynamics Pro!
http://pubads.g.doubleclick.net/gampad/clk?id=84349831&iu=/4140/ostg.clktrk
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs