Revision: 19561 http://sourceforge.net/p/gate/code/19561 Author: ian_roberts Date: 2016-09-02 13:40:20 +0000 (Fri, 02 Sep 2016) Log Message: ----------- First complete version of this tool, ready for proper testing.
Modified Paths: -------------- mimir/trunk/mimir-core/src/gate/mimir/util/TruncateIndex.java Modified: mimir/trunk/mimir-core/src/gate/mimir/util/TruncateIndex.java =================================================================== --- mimir/trunk/mimir-core/src/gate/mimir/util/TruncateIndex.java 2016-09-02 01:22:34 UTC (rev 19560) +++ mimir/trunk/mimir-core/src/gate/mimir/util/TruncateIndex.java 2016-09-02 13:40:20 UTC (rev 19561) @@ -14,9 +14,15 @@ */ package gate.mimir.util; +import gate.mimir.IndexConfig; +import gate.mimir.IndexConfig.SemanticIndexerConfig; +import gate.mimir.IndexConfig.TokenIndexerConfig; +import gate.mimir.MimirIndex; import gate.mimir.index.AtomicIndex; import gate.mimir.index.DocumentCollection; import it.unimi.di.big.mg4j.index.CompressionFlags; +import it.unimi.di.big.mg4j.index.CompressionFlags.Coding; +import it.unimi.di.big.mg4j.index.CompressionFlags.Component; import it.unimi.di.big.mg4j.index.DiskBasedIndex; import it.unimi.di.big.mg4j.index.Index; import it.unimi.di.big.mg4j.index.IndexIterator; @@ -27,10 +33,13 @@ import it.unimi.di.big.mg4j.io.IOFactory; import it.unimi.di.big.mg4j.tool.Scan; import it.unimi.dsi.big.io.FileLinesCollection; +import it.unimi.dsi.big.io.FileLinesCollection.FileLinesIterator; import it.unimi.dsi.bits.Fast; import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntList; import it.unimi.dsi.fastutil.io.BinIO; +import it.unimi.dsi.fastutil.io.FastBufferedOutputStream; +import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap; import it.unimi.dsi.fastutil.longs.LongArrayList; import it.unimi.dsi.fastutil.longs.LongList; import it.unimi.dsi.io.InputBitStream; @@ -51,8 +60,10 @@ import java.nio.ByteOrder; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.regex.Pattern; import java.util.zip.ZipEntry; import java.util.zip.ZipFile; @@ -225,8 +236,25 @@ trimBatch(indexDirectory, batches.names[endBatch], totalDocsInZips - endOfPreviousBatch); - // 4.3. Rebuild the direct indexes for those AtomicIndexes that + // 4.3. Truncate the direct indexes for those AtomicIndexes that // require it + IndexConfig indexConfig = + IndexConfig.readConfigFromFile(new File(indexDirectory, + MimirIndex.INDEX_CONFIG_FILENAME)); + TokenIndexerConfig[] tokenIndexes = indexConfig.getTokenIndexers(); + for(int i = 0; i < tokenIndexes.length; i++) { + if(tokenIndexes[i].isDirectIndexEnabled()) { + truncateDirectIndex(indexDirectory, "token-" + i, + batches.names[endBatch], totalDocsInZips - 1); + } + } + SemanticIndexerConfig[] semanticIndexes = indexConfig.getSemanticIndexers(); + for(int i = 0; i < semanticIndexes.length; i++) { + if(semanticIndexes[i].isDirectIndexEnabled()) { + truncateDirectIndex(indexDirectory, "mention-" + i, + batches.names[endBatch], totalDocsInZips - 1); + } + } } public static void repairLastZip(File indexDirectory) throws IOException { @@ -626,14 +654,15 @@ // write the truncated sizes file File stashedSizesFile = new File(stashedIndexBasename + DiskBasedIndex.SIZES_EXTENSION); - InputBitStream stashedSizesStream = new InputBitStream(stashedSizesFile); File sizesFile = new File(outputIndexBasename + DiskBasedIndex.SIZES_EXTENSION); - OutputBitStream sizesStream = new OutputBitStream(sizesFile); - for(long i = 0; i < numDocs; i++) { - sizesStream.writeGamma(stashedSizesStream.readGamma()); + try(InputBitStream stashedSizesStream = + new InputBitStream(stashedSizesFile); + OutputBitStream sizesStream = new OutputBitStream(sizesFile)) { + for(long i = 0; i < numDocs; i++) { + sizesStream.writeGamma(stashedSizesStream.readGamma()); + } } - sizesStream.close(); // generate the index properties Properties stashedProps = new Properties(); @@ -647,14 +676,205 @@ stashedProps.getProperty(Index.PropertyKeys.TERMPROCESSOR)); newProps.setProperty(Index.PropertyKeys.SIZE, writtenBits); // -1 means unknown - newProps.setProperty(Index.PropertyKeys.MAXDOCSIZE, - maxDocSize); + newProps.setProperty(Index.PropertyKeys.MAXDOCSIZE, maxDocSize); newProps.setProperty(Index.PropertyKeys.MAXCOUNT, maxCount); - newProps.setProperty(Index.PropertyKeys.OCCURRENCES, - totalOccurrences); + newProps.setProperty(Index.PropertyKeys.OCCURRENCES, totalOccurrences); writerProperties.addAll(newProps); Scan.saveProperties(IOFactory.FILESYSTEM_FACTORY, writerProperties, outputIndexBasename + DiskBasedIndex.PROPERTIES_EXTENSION); } } + + /** + * Truncate the given direct index to remove documents beyond the + * given lastDocId. The original version of the batch is assumed to + * have been stashed as broken-batches/subIndexName-batchName + * + * @param indexDirectory the top-level index directory + * @param subIndexName the name of the sub-index (token-N or + * mention-N) + * @param batchName the name of the batch (head or tail-N) + * @param lastDocId the last valid document ID + */ + public static void truncateDirectIndex(File indexDirectory, + String subIndexName, String batchName, long lastDocId) + throws Exception { + File brokenBatches = new File(indexDirectory, "broken-batches"); + File stashedBatch = new File(brokenBatches, subIndexName + "-" + batchName); + if(!stashedBatch.exists()) { + throw new RuntimeException("Stashed batch " + stashedBatch + " not found"); + } + File batchDir = new File(new File(indexDirectory, subIndexName), batchName); + batchDir.mkdirs(); + log.info("Trimming direct index for batch " + batchDir); + + String stashedIndexBasename = + new File(stashedBatch, subIndexName + + AtomicIndex.DIRECT_INDEX_NAME_SUFFIX).getAbsolutePath(); + String outputIndexBasename = + new File(batchDir, subIndexName + + AtomicIndex.DIRECT_INDEX_NAME_SUFFIX).getAbsolutePath(); + + // A direct index is modelled in Mimir as an inverted index where + // the terms are documents and vice versa. The "term string" is a + // zero-padded hex representation of the document ID, so we simply + // need to stream "inverted" lists from the stashed index to the new + // one until we reach the term string that is the hex representation + // of lastDocId. + + MutableString lastDocIdAsHex = + new MutableString(AtomicIndex.longToTerm(lastDocId)); + + // determine the number of documents in this direct index (i.e. the + // number of entries in the .terms file that are lexicographically + // <= lastDocIdAsHex) + long numDocsInIndex = 0; + + File stashedTermsFile = + new File(stashedIndexBasename + DiskBasedIndex.TERMS_EXTENSION); + FileLinesCollection termsColl = + new FileLinesCollection(stashedTermsFile.getAbsolutePath(), "UTF-8"); + try(FileLinesIterator docIdsIter = termsColl.iterator()) { + while(docIdsIter.hasNext() + && docIdsIter.next().compareTo(lastDocIdAsHex) <= 0) { + numDocsInIndex++; + } + } + log.info("Trimmed index will contain " + numDocsInIndex + " documents"); + + // write the truncated "terms" file, term map and bloom filter + BloomFilter<Void> docBloomFilter = BloomFilter.create(numDocsInIndex); + + try(FileLinesIterator docIdsIter = termsColl.iterator(); + PrintWriter pw = + new PrintWriter(new OutputStreamWriter( + new FastBufferedOutputStream(new FileOutputStream( + outputIndexBasename + + DiskBasedIndex.TERMS_EXTENSION), + 64 * 1024), "UTF-8"))) { + for(long i = 0; i < numDocsInIndex; i++) { + MutableString t = docIdsIter.next(); + t.println(pw); + docBloomFilter.add(t); + } + } + AtomicIndex.generateTermMap(new File(outputIndexBasename + + DiskBasedIndex.TERMS_EXTENSION), new File(outputIndexBasename + + DiskBasedIndex.TERMMAP_EXTENSION), null); + BinIO.storeObject(docBloomFilter, new File(outputIndexBasename + + DocumentalCluster.BLOOM_EXTENSION)); + + // stream "inverted lists" (i.e. documents) from the stashed to the + // new index, and build up a cache of "document sizes" (i.e. the + // number of documents that contain each term referenced in this + // index). We can't simply use the sizes from the stashed index + // because they will include the counts from the inverted lists + // we're trimming off. + Long2IntOpenHashMap termSizes = new Long2IntOpenHashMap(); + termSizes.defaultReturnValue(0); + + // we need the total potential number of direct terms to create the + // index writer + File directTermsFile = + new File(new File(indexDirectory, subIndexName), + AtomicIndex.DIRECT_TERMS_FILENAME); + FileLinesCollection directTerms = + new FileLinesCollection(directTermsFile.getAbsolutePath(), "UTF-8"); + + Index stashedIndex = Index.getInstance(stashedIndexBasename, true, false); + + int maxCount = 0; + long totalOccurrences = 0; + long writtenBits = 0; + int maxTermSize = -1; // -1 means unknown + Properties writerProperties; + + try(IndexReader indexReader = stashedIndex.getReader()) { + // copy the default compression flags, and remove positions + Map<Component, Coding> flags = + new HashMap<Component, Coding>( + CompressionFlags.DEFAULT_QUASI_SUCCINCT_INDEX); + flags.remove(Component.POSITIONS); + QuasiSuccinctIndexWriter directIndexWriter = + new QuasiSuccinctIndexWriter( + IOFactory.FILESYSTEM_FACTORY, + outputIndexBasename, + directTerms.size64(), + Fast.mostSignificantBit(QuasiSuccinctIndex.DEFAULT_QUANTUM), + QuasiSuccinctIndexWriter.DEFAULT_CACHE_SIZE, flags, + ByteOrder.nativeOrder()); + IndexIterator iter; + int docCounter = 0; + long occurrences = 0; + while((iter = indexReader.nextIterator()) != null + && ++docCounter <= numDocsInIndex) { + // annoyingly we can't stream straight from the old inverted + // list to the new one, as we need to know up front the total + // occurrences value which is not exposed through any public + // API. + LongList docPointers = new LongArrayList(); + IntList counts = new IntArrayList(); + long frequency = iter.frequency(); + long curPointer; + while((curPointer = iter.nextDocument()) != IndexIterator.END_OF_LIST) { + docPointers.add(curPointer); + counts.add(iter.count()); + termSizes.put(curPointer, termSizes.get(curPointer) + iter.count()); + occurrences += iter.count(); + totalOccurrences += iter.count(); + if(iter.count() > maxCount) { + maxCount = iter.count(); + } + } + directIndexWriter.newInvertedList(frequency, occurrences, 0); + directIndexWriter.writeFrequency(frequency); + for(int i = 0; i < frequency; i++) { + OutputBitStream obs = directIndexWriter.newDocumentRecord(); + directIndexWriter.writeDocumentPointer(obs, docPointers.get(i)); + directIndexWriter.writePositionCount(obs, counts.get(i)); + // no positions in a direct index + } + } + directIndexWriter.close(); + writtenBits = directIndexWriter.writtenBits(); + + // write the new sizes file + File sizesFile = new File(outputIndexBasename + DiskBasedIndex.SIZES_EXTENSION); + try(OutputBitStream sizesStream = new OutputBitStream(sizesFile)) { + for(long i = 0; i < directTerms.size64(); i++) { + int termSize = termSizes.get(i); + sizesStream.writeGamma(termSize); + if(termSize > maxTermSize) { + maxTermSize = termSize; + } + } + } + writerProperties = directIndexWriter.properties(); + // write stats file + try(PrintStream statsPs = + new PrintStream(new File(outputIndexBasename + + DiskBasedIndex.STATS_EXTENSION))) { + directIndexWriter.printStats(statsPs); + } + } + + // generate the index properties + Properties stashedProps = new Properties(); + try(FileInputStream stashedPropsStream = + new FileInputStream(stashedIndexBasename + + DiskBasedIndex.PROPERTIES_EXTENSION)) { + stashedProps.load(stashedPropsStream); + } + Properties newProps = new Properties(); + newProps.setProperty(Index.PropertyKeys.TERMPROCESSOR, + stashedProps.getProperty(Index.PropertyKeys.TERMPROCESSOR)); + newProps.setProperty(Index.PropertyKeys.SIZE, writtenBits); + // -1 means unknown + newProps.setProperty(Index.PropertyKeys.MAXDOCSIZE, maxTermSize); + newProps.setProperty(Index.PropertyKeys.MAXCOUNT, maxCount); + newProps.setProperty(Index.PropertyKeys.OCCURRENCES, totalOccurrences); + writerProperties.addAll(newProps); + Scan.saveProperties(IOFactory.FILESYSTEM_FACTORY, writerProperties, + outputIndexBasename + DiskBasedIndex.PROPERTIES_EXTENSION); + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. ------------------------------------------------------------------------------ _______________________________________________ GATE-cvs mailing list GATE-cvs@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/gate-cvs