Revision: 19561
          http://sourceforge.net/p/gate/code/19561
Author:   ian_roberts
Date:     2016-09-02 13:40:20 +0000 (Fri, 02 Sep 2016)
Log Message:
-----------
First complete version of this tool, ready for proper testing.

Modified Paths:
--------------
    mimir/trunk/mimir-core/src/gate/mimir/util/TruncateIndex.java

Modified: mimir/trunk/mimir-core/src/gate/mimir/util/TruncateIndex.java
===================================================================
--- mimir/trunk/mimir-core/src/gate/mimir/util/TruncateIndex.java       
2016-09-02 01:22:34 UTC (rev 19560)
+++ mimir/trunk/mimir-core/src/gate/mimir/util/TruncateIndex.java       
2016-09-02 13:40:20 UTC (rev 19561)
@@ -14,9 +14,15 @@
  */
 package gate.mimir.util;
 
+import gate.mimir.IndexConfig;
+import gate.mimir.IndexConfig.SemanticIndexerConfig;
+import gate.mimir.IndexConfig.TokenIndexerConfig;
+import gate.mimir.MimirIndex;
 import gate.mimir.index.AtomicIndex;
 import gate.mimir.index.DocumentCollection;
 import it.unimi.di.big.mg4j.index.CompressionFlags;
+import it.unimi.di.big.mg4j.index.CompressionFlags.Coding;
+import it.unimi.di.big.mg4j.index.CompressionFlags.Component;
 import it.unimi.di.big.mg4j.index.DiskBasedIndex;
 import it.unimi.di.big.mg4j.index.Index;
 import it.unimi.di.big.mg4j.index.IndexIterator;
@@ -27,10 +33,13 @@
 import it.unimi.di.big.mg4j.io.IOFactory;
 import it.unimi.di.big.mg4j.tool.Scan;
 import it.unimi.dsi.big.io.FileLinesCollection;
+import it.unimi.dsi.big.io.FileLinesCollection.FileLinesIterator;
 import it.unimi.dsi.bits.Fast;
 import it.unimi.dsi.fastutil.ints.IntArrayList;
 import it.unimi.dsi.fastutil.ints.IntList;
 import it.unimi.dsi.fastutil.io.BinIO;
+import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
+import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
 import it.unimi.dsi.fastutil.longs.LongArrayList;
 import it.unimi.dsi.fastutil.longs.LongList;
 import it.unimi.dsi.io.InputBitStream;
@@ -51,8 +60,10 @@
 import java.nio.ByteOrder;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Pattern;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
@@ -225,8 +236,25 @@
     trimBatch(indexDirectory, batches.names[endBatch], totalDocsInZips
             - endOfPreviousBatch);
 
-    // 4.3. Rebuild the direct indexes for those AtomicIndexes that
+    // 4.3. Truncate the direct indexes for those AtomicIndexes that
     // require it
+    IndexConfig indexConfig =
+            IndexConfig.readConfigFromFile(new File(indexDirectory,
+                    MimirIndex.INDEX_CONFIG_FILENAME));
+    TokenIndexerConfig[] tokenIndexes = indexConfig.getTokenIndexers();
+    for(int i = 0; i < tokenIndexes.length; i++) {
+      if(tokenIndexes[i].isDirectIndexEnabled()) {
+        truncateDirectIndex(indexDirectory, "token-" + i,
+                batches.names[endBatch], totalDocsInZips - 1);
+      }
+    }
+    SemanticIndexerConfig[] semanticIndexes = 
indexConfig.getSemanticIndexers();
+    for(int i = 0; i < semanticIndexes.length; i++) {
+      if(semanticIndexes[i].isDirectIndexEnabled()) {
+        truncateDirectIndex(indexDirectory, "mention-" + i,
+                batches.names[endBatch], totalDocsInZips - 1);
+      }
+    }
   }
 
   public static void repairLastZip(File indexDirectory) throws IOException {
@@ -626,14 +654,15 @@
       // write the truncated sizes file
       File stashedSizesFile =
               new File(stashedIndexBasename + DiskBasedIndex.SIZES_EXTENSION);
-      InputBitStream stashedSizesStream = new InputBitStream(stashedSizesFile);
       File sizesFile =
               new File(outputIndexBasename + DiskBasedIndex.SIZES_EXTENSION);
-      OutputBitStream sizesStream = new OutputBitStream(sizesFile);
-      for(long i = 0; i < numDocs; i++) {
-        sizesStream.writeGamma(stashedSizesStream.readGamma());
+      try(InputBitStream stashedSizesStream =
+              new InputBitStream(stashedSizesFile);
+              OutputBitStream sizesStream = new OutputBitStream(sizesFile)) {
+        for(long i = 0; i < numDocs; i++) {
+          sizesStream.writeGamma(stashedSizesStream.readGamma());
+        }
       }
-      sizesStream.close();
 
       // generate the index properties
       Properties stashedProps = new Properties();
@@ -647,14 +676,205 @@
               stashedProps.getProperty(Index.PropertyKeys.TERMPROCESSOR));
       newProps.setProperty(Index.PropertyKeys.SIZE, writtenBits);
       // -1 means unknown
-      newProps.setProperty(Index.PropertyKeys.MAXDOCSIZE,
-              maxDocSize);
+      newProps.setProperty(Index.PropertyKeys.MAXDOCSIZE, maxDocSize);
       newProps.setProperty(Index.PropertyKeys.MAXCOUNT, maxCount);
-      newProps.setProperty(Index.PropertyKeys.OCCURRENCES,
-              totalOccurrences);
+      newProps.setProperty(Index.PropertyKeys.OCCURRENCES, totalOccurrences);
       writerProperties.addAll(newProps);
       Scan.saveProperties(IOFactory.FILESYSTEM_FACTORY, writerProperties,
               outputIndexBasename + DiskBasedIndex.PROPERTIES_EXTENSION);
     }
   }
+
+  /**
+   * Truncate the given direct index to remove documents beyond the
+   * given lastDocId. The original version of the batch is assumed to
+   * have been stashed as broken-batches/subIndexName-batchName
+   * 
+   * @param indexDirectory the top-level index directory
+   * @param subIndexName the name of the sub-index (token-N or
+   *          mention-N)
+   * @param batchName the name of the batch (head or tail-N)
+   * @param lastDocId the last valid document ID
+   */
+  public static void truncateDirectIndex(File indexDirectory,
+          String subIndexName, String batchName, long lastDocId)
+          throws Exception {
+    File brokenBatches = new File(indexDirectory, "broken-batches");
+    File stashedBatch = new File(brokenBatches, subIndexName + "-" + 
batchName);
+    if(!stashedBatch.exists()) {
+      throw new RuntimeException("Stashed batch " + stashedBatch + " not 
found");
+    }
+    File batchDir = new File(new File(indexDirectory, subIndexName), 
batchName);
+    batchDir.mkdirs();
+    log.info("Trimming direct index for batch " + batchDir);
+
+    String stashedIndexBasename =
+            new File(stashedBatch, subIndexName
+                    + AtomicIndex.DIRECT_INDEX_NAME_SUFFIX).getAbsolutePath();
+    String outputIndexBasename =
+            new File(batchDir, subIndexName
+                    + AtomicIndex.DIRECT_INDEX_NAME_SUFFIX).getAbsolutePath();
+
+    // A direct index is modelled in Mimir as an inverted index where
+    // the terms are documents and vice versa. The "term string" is a
+    // zero-padded hex representation of the document ID, so we simply
+    // need to stream "inverted" lists from the stashed index to the new
+    // one until we reach the term string that is the hex representation
+    // of lastDocId.
+
+    MutableString lastDocIdAsHex =
+            new MutableString(AtomicIndex.longToTerm(lastDocId));
+
+    // determine the number of documents in this direct index (i.e. the
+    // number of entries in the .terms file that are lexicographically
+    // <= lastDocIdAsHex)
+    long numDocsInIndex = 0;
+
+    File stashedTermsFile =
+            new File(stashedIndexBasename + DiskBasedIndex.TERMS_EXTENSION);
+    FileLinesCollection termsColl =
+            new FileLinesCollection(stashedTermsFile.getAbsolutePath(), 
"UTF-8");
+    try(FileLinesIterator docIdsIter = termsColl.iterator()) {
+      while(docIdsIter.hasNext()
+              && docIdsIter.next().compareTo(lastDocIdAsHex) <= 0) {
+        numDocsInIndex++;
+      }
+    }
+    log.info("Trimmed index will contain " + numDocsInIndex + " documents");
+
+    // write the truncated "terms" file, term map and bloom filter
+    BloomFilter<Void> docBloomFilter = BloomFilter.create(numDocsInIndex);
+
+    try(FileLinesIterator docIdsIter = termsColl.iterator();
+            PrintWriter pw =
+                    new PrintWriter(new OutputStreamWriter(
+                            new FastBufferedOutputStream(new FileOutputStream(
+                                    outputIndexBasename
+                                            + DiskBasedIndex.TERMS_EXTENSION),
+                                    64 * 1024), "UTF-8"))) {
+      for(long i = 0; i < numDocsInIndex; i++) {
+        MutableString t = docIdsIter.next();
+        t.println(pw);
+        docBloomFilter.add(t);
+      }
+    }
+    AtomicIndex.generateTermMap(new File(outputIndexBasename
+            + DiskBasedIndex.TERMS_EXTENSION), new File(outputIndexBasename
+            + DiskBasedIndex.TERMMAP_EXTENSION), null);
+    BinIO.storeObject(docBloomFilter, new File(outputIndexBasename
+            + DocumentalCluster.BLOOM_EXTENSION));
+
+    // stream "inverted lists" (i.e. documents) from the stashed to the
+    // new index, and build up a cache of "document sizes" (i.e. the
+    // number of documents that contain each term referenced in this
+    // index). We can't simply use the sizes from the stashed index
+    // because they will include the counts from the inverted lists
+    // we're trimming off.
+    Long2IntOpenHashMap termSizes = new Long2IntOpenHashMap();
+    termSizes.defaultReturnValue(0);
+
+    // we need the total potential number of direct terms to create the
+    // index writer
+    File directTermsFile =
+            new File(new File(indexDirectory, subIndexName),
+                    AtomicIndex.DIRECT_TERMS_FILENAME);
+    FileLinesCollection directTerms =
+            new FileLinesCollection(directTermsFile.getAbsolutePath(), 
"UTF-8");
+
+    Index stashedIndex = Index.getInstance(stashedIndexBasename, true, false);
+
+    int maxCount = 0;
+    long totalOccurrences = 0;
+    long writtenBits = 0;
+    int maxTermSize = -1; // -1 means unknown
+    Properties writerProperties;
+    
+    try(IndexReader indexReader = stashedIndex.getReader()) {
+      // copy the default compression flags, and remove positions
+      Map<Component, Coding> flags =
+              new HashMap<Component, Coding>(
+                      CompressionFlags.DEFAULT_QUASI_SUCCINCT_INDEX);
+      flags.remove(Component.POSITIONS);
+      QuasiSuccinctIndexWriter directIndexWriter =
+              new QuasiSuccinctIndexWriter(
+                      IOFactory.FILESYSTEM_FACTORY,
+                      outputIndexBasename,
+                      directTerms.size64(),
+                      
Fast.mostSignificantBit(QuasiSuccinctIndex.DEFAULT_QUANTUM),
+                      QuasiSuccinctIndexWriter.DEFAULT_CACHE_SIZE, flags,
+                      ByteOrder.nativeOrder());
+      IndexIterator iter;
+      int docCounter = 0;
+      long occurrences = 0;
+      while((iter = indexReader.nextIterator()) != null
+              && ++docCounter <= numDocsInIndex) {
+        // annoyingly we can't stream straight from the old inverted
+        // list to the new one, as we need to know up front the total
+        // occurrences value which is not exposed through any public
+        // API.
+        LongList docPointers = new LongArrayList();
+        IntList counts = new IntArrayList();
+        long frequency = iter.frequency();
+        long curPointer;
+        while((curPointer = iter.nextDocument()) != IndexIterator.END_OF_LIST) 
{
+          docPointers.add(curPointer);
+          counts.add(iter.count());
+          termSizes.put(curPointer, termSizes.get(curPointer) + iter.count());
+          occurrences += iter.count();
+          totalOccurrences += iter.count();
+          if(iter.count() > maxCount) {
+            maxCount = iter.count();
+          }
+        }
+        directIndexWriter.newInvertedList(frequency, occurrences, 0);
+        directIndexWriter.writeFrequency(frequency);
+        for(int i = 0; i < frequency; i++) {
+          OutputBitStream obs = directIndexWriter.newDocumentRecord();
+          directIndexWriter.writeDocumentPointer(obs, docPointers.get(i));
+          directIndexWriter.writePositionCount(obs, counts.get(i));
+          // no positions in a direct index
+        }
+      }
+      directIndexWriter.close();
+      writtenBits = directIndexWriter.writtenBits();
+      
+      // write the new sizes file
+      File sizesFile = new File(outputIndexBasename + 
DiskBasedIndex.SIZES_EXTENSION);
+      try(OutputBitStream sizesStream = new OutputBitStream(sizesFile)) {
+        for(long i = 0; i < directTerms.size64(); i++) {
+          int termSize = termSizes.get(i);
+          sizesStream.writeGamma(termSize);
+          if(termSize > maxTermSize) {
+            maxTermSize = termSize;
+          }
+        }
+      }
+      writerProperties = directIndexWriter.properties();
+      // write stats file
+      try(PrintStream statsPs =
+              new PrintStream(new File(outputIndexBasename
+                      + DiskBasedIndex.STATS_EXTENSION))) {
+        directIndexWriter.printStats(statsPs);
+      }
+    }
+    
+    // generate the index properties
+    Properties stashedProps = new Properties();
+    try(FileInputStream stashedPropsStream =
+            new FileInputStream(stashedIndexBasename
+                    + DiskBasedIndex.PROPERTIES_EXTENSION)) {
+      stashedProps.load(stashedPropsStream);
+    }
+    Properties newProps = new Properties();
+    newProps.setProperty(Index.PropertyKeys.TERMPROCESSOR,
+            stashedProps.getProperty(Index.PropertyKeys.TERMPROCESSOR));
+    newProps.setProperty(Index.PropertyKeys.SIZE, writtenBits);
+    // -1 means unknown
+    newProps.setProperty(Index.PropertyKeys.MAXDOCSIZE, maxTermSize);
+    newProps.setProperty(Index.PropertyKeys.MAXCOUNT, maxCount);
+    newProps.setProperty(Index.PropertyKeys.OCCURRENCES, totalOccurrences);
+    writerProperties.addAll(newProps);
+    Scan.saveProperties(IOFactory.FILESYSTEM_FACTORY, writerProperties,
+            outputIndexBasename + DiskBasedIndex.PROPERTIES_EXTENSION);
+  }
 }

This was sent by the SourceForge.net collaborative development platform, the 
world's largest Open Source development site.


------------------------------------------------------------------------------
_______________________________________________
GATE-cvs mailing list
GATE-cvs@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/gate-cvs

Reply via email to