Revision: 19561
http://sourceforge.net/p/gate/code/19561
Author: ian_roberts
Date: 2016-09-02 13:40:20 +0000 (Fri, 02 Sep 2016)
Log Message:
-----------
First complete version of this tool, ready for proper testing.
Modified Paths:
--------------
mimir/trunk/mimir-core/src/gate/mimir/util/TruncateIndex.java
Modified: mimir/trunk/mimir-core/src/gate/mimir/util/TruncateIndex.java
===================================================================
--- mimir/trunk/mimir-core/src/gate/mimir/util/TruncateIndex.java
2016-09-02 01:22:34 UTC (rev 19560)
+++ mimir/trunk/mimir-core/src/gate/mimir/util/TruncateIndex.java
2016-09-02 13:40:20 UTC (rev 19561)
@@ -14,9 +14,15 @@
*/
package gate.mimir.util;
+import gate.mimir.IndexConfig;
+import gate.mimir.IndexConfig.SemanticIndexerConfig;
+import gate.mimir.IndexConfig.TokenIndexerConfig;
+import gate.mimir.MimirIndex;
import gate.mimir.index.AtomicIndex;
import gate.mimir.index.DocumentCollection;
import it.unimi.di.big.mg4j.index.CompressionFlags;
+import it.unimi.di.big.mg4j.index.CompressionFlags.Coding;
+import it.unimi.di.big.mg4j.index.CompressionFlags.Component;
import it.unimi.di.big.mg4j.index.DiskBasedIndex;
import it.unimi.di.big.mg4j.index.Index;
import it.unimi.di.big.mg4j.index.IndexIterator;
@@ -27,10 +33,13 @@
import it.unimi.di.big.mg4j.io.IOFactory;
import it.unimi.di.big.mg4j.tool.Scan;
import it.unimi.dsi.big.io.FileLinesCollection;
+import it.unimi.dsi.big.io.FileLinesCollection.FileLinesIterator;
import it.unimi.dsi.bits.Fast;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.io.BinIO;
+import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
+import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongList;
import it.unimi.dsi.io.InputBitStream;
@@ -51,8 +60,10 @@
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.regex.Pattern;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
@@ -225,8 +236,25 @@
trimBatch(indexDirectory, batches.names[endBatch], totalDocsInZips
- endOfPreviousBatch);
- // 4.3. Rebuild the direct indexes for those AtomicIndexes that
+ // 4.3. Truncate the direct indexes for those AtomicIndexes that
// require it
+ IndexConfig indexConfig =
+ IndexConfig.readConfigFromFile(new File(indexDirectory,
+ MimirIndex.INDEX_CONFIG_FILENAME));
+ TokenIndexerConfig[] tokenIndexes = indexConfig.getTokenIndexers();
+ for(int i = 0; i < tokenIndexes.length; i++) {
+ if(tokenIndexes[i].isDirectIndexEnabled()) {
+ truncateDirectIndex(indexDirectory, "token-" + i,
+ batches.names[endBatch], totalDocsInZips - 1);
+ }
+ }
+ SemanticIndexerConfig[] semanticIndexes =
indexConfig.getSemanticIndexers();
+ for(int i = 0; i < semanticIndexes.length; i++) {
+ if(semanticIndexes[i].isDirectIndexEnabled()) {
+ truncateDirectIndex(indexDirectory, "mention-" + i,
+ batches.names[endBatch], totalDocsInZips - 1);
+ }
+ }
}
public static void repairLastZip(File indexDirectory) throws IOException {
@@ -626,14 +654,15 @@
// write the truncated sizes file
File stashedSizesFile =
new File(stashedIndexBasename + DiskBasedIndex.SIZES_EXTENSION);
- InputBitStream stashedSizesStream = new InputBitStream(stashedSizesFile);
File sizesFile =
new File(outputIndexBasename + DiskBasedIndex.SIZES_EXTENSION);
- OutputBitStream sizesStream = new OutputBitStream(sizesFile);
- for(long i = 0; i < numDocs; i++) {
- sizesStream.writeGamma(stashedSizesStream.readGamma());
+ try(InputBitStream stashedSizesStream =
+ new InputBitStream(stashedSizesFile);
+ OutputBitStream sizesStream = new OutputBitStream(sizesFile)) {
+ for(long i = 0; i < numDocs; i++) {
+ sizesStream.writeGamma(stashedSizesStream.readGamma());
+ }
}
- sizesStream.close();
// generate the index properties
Properties stashedProps = new Properties();
@@ -647,14 +676,205 @@
stashedProps.getProperty(Index.PropertyKeys.TERMPROCESSOR));
newProps.setProperty(Index.PropertyKeys.SIZE, writtenBits);
// -1 means unknown
- newProps.setProperty(Index.PropertyKeys.MAXDOCSIZE,
- maxDocSize);
+ newProps.setProperty(Index.PropertyKeys.MAXDOCSIZE, maxDocSize);
newProps.setProperty(Index.PropertyKeys.MAXCOUNT, maxCount);
- newProps.setProperty(Index.PropertyKeys.OCCURRENCES,
- totalOccurrences);
+ newProps.setProperty(Index.PropertyKeys.OCCURRENCES, totalOccurrences);
writerProperties.addAll(newProps);
Scan.saveProperties(IOFactory.FILESYSTEM_FACTORY, writerProperties,
outputIndexBasename + DiskBasedIndex.PROPERTIES_EXTENSION);
}
}
+
+ /**
+ * Truncate the given direct index to remove documents beyond the
+ * given lastDocId. The original version of the batch is assumed to
+ * have been stashed as broken-batches/subIndexName-batchName
+ *
+ * @param indexDirectory the top-level index directory
+ * @param subIndexName the name of the sub-index (token-N or
+ * mention-N)
+ * @param batchName the name of the batch (head or tail-N)
+ * @param lastDocId the last valid document ID
+ */
+ public static void truncateDirectIndex(File indexDirectory,
+ String subIndexName, String batchName, long lastDocId)
+ throws Exception {
+ File brokenBatches = new File(indexDirectory, "broken-batches");
+ File stashedBatch = new File(brokenBatches, subIndexName + "-" +
batchName);
+ if(!stashedBatch.exists()) {
+ throw new RuntimeException("Stashed batch " + stashedBatch + " not
found");
+ }
+ File batchDir = new File(new File(indexDirectory, subIndexName),
batchName);
+ batchDir.mkdirs();
+ log.info("Trimming direct index for batch " + batchDir);
+
+ String stashedIndexBasename =
+ new File(stashedBatch, subIndexName
+ + AtomicIndex.DIRECT_INDEX_NAME_SUFFIX).getAbsolutePath();
+ String outputIndexBasename =
+ new File(batchDir, subIndexName
+ + AtomicIndex.DIRECT_INDEX_NAME_SUFFIX).getAbsolutePath();
+
+ // A direct index is modelled in Mimir as an inverted index where
+ // the terms are documents and vice versa. The "term string" is a
+ // zero-padded hex representation of the document ID, so we simply
+ // need to stream "inverted" lists from the stashed index to the new
+ // one until we reach the term string that is the hex representation
+ // of lastDocId.
+
+ MutableString lastDocIdAsHex =
+ new MutableString(AtomicIndex.longToTerm(lastDocId));
+
+ // determine the number of documents in this direct index (i.e. the
+ // number of entries in the .terms file that are lexicographically
+ // <= lastDocIdAsHex)
+ long numDocsInIndex = 0;
+
+ File stashedTermsFile =
+ new File(stashedIndexBasename + DiskBasedIndex.TERMS_EXTENSION);
+ FileLinesCollection termsColl =
+ new FileLinesCollection(stashedTermsFile.getAbsolutePath(),
"UTF-8");
+ try(FileLinesIterator docIdsIter = termsColl.iterator()) {
+ while(docIdsIter.hasNext()
+ && docIdsIter.next().compareTo(lastDocIdAsHex) <= 0) {
+ numDocsInIndex++;
+ }
+ }
+ log.info("Trimmed index will contain " + numDocsInIndex + " documents");
+
+ // write the truncated "terms" file, term map and bloom filter
+ BloomFilter<Void> docBloomFilter = BloomFilter.create(numDocsInIndex);
+
+ try(FileLinesIterator docIdsIter = termsColl.iterator();
+ PrintWriter pw =
+ new PrintWriter(new OutputStreamWriter(
+ new FastBufferedOutputStream(new FileOutputStream(
+ outputIndexBasename
+ + DiskBasedIndex.TERMS_EXTENSION),
+ 64 * 1024), "UTF-8"))) {
+ for(long i = 0; i < numDocsInIndex; i++) {
+ MutableString t = docIdsIter.next();
+ t.println(pw);
+ docBloomFilter.add(t);
+ }
+ }
+ AtomicIndex.generateTermMap(new File(outputIndexBasename
+ + DiskBasedIndex.TERMS_EXTENSION), new File(outputIndexBasename
+ + DiskBasedIndex.TERMMAP_EXTENSION), null);
+ BinIO.storeObject(docBloomFilter, new File(outputIndexBasename
+ + DocumentalCluster.BLOOM_EXTENSION));
+
+ // stream "inverted lists" (i.e. documents) from the stashed to the
+ // new index, and build up a cache of "document sizes" (i.e. the
+ // number of documents that contain each term referenced in this
+ // index). We can't simply use the sizes from the stashed index
+ // because they will include the counts from the inverted lists
+ // we're trimming off.
+ Long2IntOpenHashMap termSizes = new Long2IntOpenHashMap();
+ termSizes.defaultReturnValue(0);
+
+ // we need the total potential number of direct terms to create the
+ // index writer
+ File directTermsFile =
+ new File(new File(indexDirectory, subIndexName),
+ AtomicIndex.DIRECT_TERMS_FILENAME);
+ FileLinesCollection directTerms =
+ new FileLinesCollection(directTermsFile.getAbsolutePath(),
"UTF-8");
+
+ Index stashedIndex = Index.getInstance(stashedIndexBasename, true, false);
+
+ int maxCount = 0;
+ long totalOccurrences = 0;
+ long writtenBits = 0;
+ int maxTermSize = -1; // -1 means unknown
+ Properties writerProperties;
+
+ try(IndexReader indexReader = stashedIndex.getReader()) {
+ // copy the default compression flags, and remove positions
+ Map<Component, Coding> flags =
+ new HashMap<Component, Coding>(
+ CompressionFlags.DEFAULT_QUASI_SUCCINCT_INDEX);
+ flags.remove(Component.POSITIONS);
+ QuasiSuccinctIndexWriter directIndexWriter =
+ new QuasiSuccinctIndexWriter(
+ IOFactory.FILESYSTEM_FACTORY,
+ outputIndexBasename,
+ directTerms.size64(),
+
Fast.mostSignificantBit(QuasiSuccinctIndex.DEFAULT_QUANTUM),
+ QuasiSuccinctIndexWriter.DEFAULT_CACHE_SIZE, flags,
+ ByteOrder.nativeOrder());
+ IndexIterator iter;
+ int docCounter = 0;
+ long occurrences = 0;
+ while((iter = indexReader.nextIterator()) != null
+ && ++docCounter <= numDocsInIndex) {
+ // annoyingly we can't stream straight from the old inverted
+ // list to the new one, as we need to know up front the total
+ // occurrences value which is not exposed through any public
+ // API.
+ LongList docPointers = new LongArrayList();
+ IntList counts = new IntArrayList();
+ long frequency = iter.frequency();
+ long curPointer;
+ while((curPointer = iter.nextDocument()) != IndexIterator.END_OF_LIST)
{
+ docPointers.add(curPointer);
+ counts.add(iter.count());
+ termSizes.put(curPointer, termSizes.get(curPointer) + iter.count());
+ occurrences += iter.count();
+ totalOccurrences += iter.count();
+ if(iter.count() > maxCount) {
+ maxCount = iter.count();
+ }
+ }
+ directIndexWriter.newInvertedList(frequency, occurrences, 0);
+ directIndexWriter.writeFrequency(frequency);
+ for(int i = 0; i < frequency; i++) {
+ OutputBitStream obs = directIndexWriter.newDocumentRecord();
+ directIndexWriter.writeDocumentPointer(obs, docPointers.get(i));
+ directIndexWriter.writePositionCount(obs, counts.get(i));
+ // no positions in a direct index
+ }
+ }
+ directIndexWriter.close();
+ writtenBits = directIndexWriter.writtenBits();
+
+ // write the new sizes file
+ File sizesFile = new File(outputIndexBasename +
DiskBasedIndex.SIZES_EXTENSION);
+ try(OutputBitStream sizesStream = new OutputBitStream(sizesFile)) {
+ for(long i = 0; i < directTerms.size64(); i++) {
+ int termSize = termSizes.get(i);
+ sizesStream.writeGamma(termSize);
+ if(termSize > maxTermSize) {
+ maxTermSize = termSize;
+ }
+ }
+ }
+ writerProperties = directIndexWriter.properties();
+ // write stats file
+ try(PrintStream statsPs =
+ new PrintStream(new File(outputIndexBasename
+ + DiskBasedIndex.STATS_EXTENSION))) {
+ directIndexWriter.printStats(statsPs);
+ }
+ }
+
+ // generate the index properties
+ Properties stashedProps = new Properties();
+ try(FileInputStream stashedPropsStream =
+ new FileInputStream(stashedIndexBasename
+ + DiskBasedIndex.PROPERTIES_EXTENSION)) {
+ stashedProps.load(stashedPropsStream);
+ }
+ Properties newProps = new Properties();
+ newProps.setProperty(Index.PropertyKeys.TERMPROCESSOR,
+ stashedProps.getProperty(Index.PropertyKeys.TERMPROCESSOR));
+ newProps.setProperty(Index.PropertyKeys.SIZE, writtenBits);
+ // -1 means unknown
+ newProps.setProperty(Index.PropertyKeys.MAXDOCSIZE, maxTermSize);
+ newProps.setProperty(Index.PropertyKeys.MAXCOUNT, maxCount);
+ newProps.setProperty(Index.PropertyKeys.OCCURRENCES, totalOccurrences);
+ writerProperties.addAll(newProps);
+ Scan.saveProperties(IOFactory.FILESYSTEM_FACTORY, writerProperties,
+ outputIndexBasename + DiskBasedIndex.PROPERTIES_EXTENSION);
+ }
}
This was sent by the SourceForge.net collaborative development platform, the
world's largest Open Source development site.
------------------------------------------------------------------------------
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs