Revision: 17206
http://sourceforge.net/p/gate/code/17206
Author: valyt
Date: 2013-12-24 16:30:52 +0000 (Tue, 24 Dec 2013)
Log Message:
-----------
First sketch implementation for live-compressing an index while it's indexing
(and service searches), all at the same time.
Modified Paths:
--------------
mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
mimir/branches/5.0/mimir-core/src/gate/mimir/index/mg4j/MimirIndexBuilder.java
mimir/branches/5.0/mimir-core/src/gate/mimir/util/MG4JTools.java
mimir/branches/5.0/mimir-test/src/gate/mimir/test/Scratch.java
Modified: mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
2013-12-23 16:18:08 UTC (rev 17205)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
2013-12-24 16:30:52 UTC (rev 17206)
@@ -19,6 +19,7 @@
import gate.mimir.MimirIndex;
import gate.mimir.index.mg4j.GATEDocument;
import gate.util.GateRuntimeException;
+import it.unimi.di.big.mg4j.index.BitStreamIndex;
import it.unimi.di.big.mg4j.index.CompressionFlags;
import it.unimi.di.big.mg4j.index.DiskBasedIndex;
import it.unimi.di.big.mg4j.index.Index;
@@ -26,14 +27,21 @@
import it.unimi.di.big.mg4j.index.IndexWriter;
import it.unimi.di.big.mg4j.index.QuasiSuccinctIndex;
import it.unimi.di.big.mg4j.index.QuasiSuccinctIndexWriter;
+import it.unimi.di.big.mg4j.index.SkipBitStreamIndexWriter;
import it.unimi.di.big.mg4j.index.TermProcessor;
+import it.unimi.di.big.mg4j.index.CompressionFlags.Coding;
+import it.unimi.di.big.mg4j.index.CompressionFlags.Component;
import it.unimi.di.big.mg4j.index.Index.UriKeys;
import it.unimi.di.big.mg4j.index.cluster.ContiguousDocumentalStrategy;
import it.unimi.di.big.mg4j.index.cluster.DocumentalCluster;
import it.unimi.di.big.mg4j.index.cluster.DocumentalConcatenatedCluster;
import it.unimi.di.big.mg4j.io.IOFactory;
+import it.unimi.di.big.mg4j.tool.Combine;
+import it.unimi.di.big.mg4j.tool.Concatenate;
import it.unimi.di.big.mg4j.tool.Scan;
+import it.unimi.di.big.mg4j.tool.Combine.IndexType;
import it.unimi.dsi.big.io.FileLinesCollection;
+import it.unimi.dsi.big.io.FileLinesCollection.FileLinesIterator;
import it.unimi.dsi.big.util.ShiftAddXorSignedStringMap;
import it.unimi.dsi.big.util.StringMap;
import it.unimi.dsi.bits.Fast;
@@ -49,6 +57,7 @@
import it.unimi.dsi.fastutil.objects.Object2ReferenceOpenHashMap;
import it.unimi.dsi.io.OutputBitStream;
import it.unimi.dsi.lang.MutableString;
+import it.unimi.dsi.logging.ProgressLogger;
import it.unimi.dsi.sux4j.mph.LcpMonotoneMinimalPerfectHashFunction;
import it.unimi.dsi.util.BloomFilter;
import it.unimi.dsi.util.Properties;
@@ -64,6 +73,7 @@
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.apache.commons.configuration.ConfigurationException;
@@ -259,14 +269,14 @@
protected File indexDir;
protected Index index;
protected BloomFilter<Void> termFilter;
- int numberOfDocuments;
- int numberOfTerms;
+ long numberOfDocuments;
+ long numberOfTerms;
long numberOfPostings;
long numberOfOccurences;
int maxCount;
public MG4JIndex(Index index, File indexDir,
BloomFilter<Void> termFilter,
- int numberOfDocuments, int numberOfTerms, long numberOfPostings,
+ long numberOfDocuments, long numberOfTerms, long numberOfPostings,
long numberOfOccurences, int maxCount) {
super();
this.index = index;
@@ -283,18 +293,30 @@
/**
* Given a terms file (text file with one term per line) this method
generates
* the corresponding termmap file (binary representation of a StringMap).
+ * Optionally, a {@link BloomFilter} can also be generated, if the suitable
+ * target file is provided.
* @param termsFile the input file
* @param termmapFile the output file
+ * @param bloomFilterFile if not null, the file to be used for writing
+ * the {@link BloomFilter} for the index.
* @throws IOException
*/
- public static void generateTermMap(File termsFile, File termmapFile) throws
IOException {
+ public static void generateTermMap(File termsFile, File termmapFile,
+ File bloomFilterFile) throws IOException {
FileLinesCollection fileLinesCollection =
new FileLinesCollection(termsFile.getAbsolutePath(), "UTF-8");
StringMap<CharSequence> terms = new ShiftAddXorSignedStringMap(
fileLinesCollection.iterator(),
new LcpMonotoneMinimalPerfectHashFunction<CharSequence>(
fileLinesCollection, TransformationStrategies.prefixFreeUtf16()));
- BinIO.storeObject(terms, termmapFile);
+ BinIO.storeObject(terms, termmapFile);
+ if(bloomFilterFile != null) {
+ BloomFilter<Void> bloomFilter = BloomFilter.create(terms.size64());
+ for(MutableString term : fileLinesCollection) {
+ bloomFilter.add(term);
+ }
+ BinIO.storeObject(bloomFilter, bloomFilterFile);
+ }
}
/**
@@ -495,8 +517,23 @@
*/
protected IntArrayList documentSizesInRAM;
+ /**
+ * If a request was made to compress the index (combine all sub-indexes
+ * into a new head) this flag will be set to true. The operation will be
+ * performed on the indexing thread at the first opportunity. At that point
+ * the flag will be reset to false.
+ */
+ protected volatile boolean indexCompressionRequested = false;
/**
+ * If a request was made to write the in-RAM index data to disk this flag
+ * will be set to true. The operation will be performed on the indexing
+ * thread at the first opportunity. At that point the flag will be reset to
+ * false.
+ */
+ protected volatile boolean tailWriteRequested = false;
+
+ /**
* Creates a new AtomicIndex
*
* @param parent the {@link MimirIndex} containing this atomic index.
@@ -538,7 +575,6 @@
} else {
// new index creation
indexDirectory.mkdirs();
- documentPointer = 0;
subIndexes = new ArrayList<AtomicIndex.MG4JIndex>();
}
indexCluster = openIndexCluster(subIndexes, termProcessor);
@@ -560,16 +596,7 @@
public boolean hasDirectIndex(){
return hasDirectIndex;
}
-
- public void indexDocument(Document document) {
- //TODO
-
- // write to documents queue
-
- // convert to index data in RAM
-
- }
-
+
/**
* Starts a new MG4J batch. First time around this will be the head,
* subsequent calls will start a new tail.
@@ -671,7 +698,7 @@
}
pw.close();
generateTermMap(new File(mg4jBasename + DiskBasedIndex.TERMS_EXTENSION),
- new File(mg4jBasename + DiskBasedIndex.TERMMAP_EXTENSION));
+ new File(mg4jBasename + DiskBasedIndex.TERMMAP_EXTENSION), null);
// write the bloom filter
BinIO.storeObject(termFilter,
new File(mg4jBasename + DocumentalCluster.BLOOM_EXTENSION));
@@ -739,9 +766,12 @@
}
MG4JIndex newIndexData = new MG4JIndex(newIndex, newTailDir, termFilter,
documentsInRAM, numTermsInRAM, postingsInRam, occurrencesInRAM,
maxCount);
- subIndexes.add(newIndexData);
- indexCluster = openIndexCluster(subIndexes, termProcessor);
-
+ // modify internal state
+ synchronized(this) {
+ subIndexes.add(newIndexData);
+ indexCluster = openIndexCluster(subIndexes, termProcessor);
+ }
+
if(hasDirectIndex) {
// dump new direct tail (invert the tail just written)
// merge new direct tail into direct index cluster
@@ -754,27 +784,158 @@
}
/**
- * Combines all the currently existing tails into the head, generating
a new
+ * Combines all the currently existing sub-indexes, generating a new
* head index.
+ * @throws IndexException
+ * @throws IOException
*/
- public void combineTails() {
- // TODO
+ protected void compressIndex() throws IndexException, IOException {
+ File headDirNew = new File(indexDirectory, HEAD_FILE_NAME +
HEAD_NEW_EXT);
+ // make a local copy of the sub-indexes
+ List<MG4JIndex> indexesToMerge =
+ new ArrayList<AtomicIndex.MG4JIndex>(subIndexes);
+ if(!headDirNew.mkdir()) {
+ throw new IndexException("Could not create new head directory at "
+
+ headDirNew.getAbsolutePath() + "!");
+ }
- // create new head directory
- // start combining the head, and each of the tails, writing out to
the
- // new head dir.
+ Map<Component,Coding> codingFlags =
+ CompressionFlags.DEFAULT_QUASI_SUCCINCT_INDEX;
+ String outputBaseName = new File(headDirNew, name).getAbsolutePath();
+ String[] inputBaseNames = new String[indexesToMerge.size()];
+ for(int i = 0; i < inputBaseNames.length; i++) {
+ inputBaseNames[i] = new File(indexesToMerge.get(i).indexDir, name)
+ .getAbsolutePath();
+ }
+ try {
+ new Concatenate(
+ IOFactory.FILESYSTEM_FACTORY,
+ outputBaseName,
+ inputBaseNames,
+ false, // metadataOnly
+ Combine.DEFAULT_BUFFER_SIZE,
+ codingFlags,
+ IndexType.QUASI_SUCCINCT,
+ true, // skips
+ // BitStreamIndex.DEFAULT_QUANTUM,
+ // replaced with optimised automatic calculation
+ -5,
+ BitStreamIndex.DEFAULT_HEIGHT,
+ SkipBitStreamIndexWriter.DEFAULT_TEMP_BUFFER_SIZE,
+ ProgressLogger.DEFAULT_LOG_INTERVAL).run();
+ // generate term map
+ generateTermMap(new File(outputBaseName +
DiskBasedIndex.TERMS_EXTENSION),
+ new File(outputBaseName + DiskBasedIndex.TERMMAP_EXTENSION),
+ new File(outputBaseName + DocumentalCluster.BLOOM_EXTENSION));
+
+
+
+ } catch(Exception e) {
+ throw new IndexException("Exception while combining sub-indexes", e);
+ }
+
+ // update the internal state
+ synchronized(this) {
+ // remove the indexes that were merged
+ subIndexes.removeAll(indexesToMerge);
+ // insert the new head at the front of the list
+ File headDir = new File(indexDirectory, HEAD_FILE_NAME);
+ File headDirOld = new File(indexDirectory, HEAD_FILE_NAME +
HEAD_OLD_EXT);
+ if(headDir.exists() && headDir.renameTo(headDirOld)){
+ if(headDirNew.renameTo(headDir)) {
+ subIndexes.add(0, openSubIndex(HEAD_FILE_NAME));
+ indexCluster = openIndexCluster(subIndexes, termProcessor);
+
+ // clean-up: delete old head, used-up tails
+ if(!gate.util.Files.rmdir(headDirOld)) {
+ throw new IndexException(
+ "Could not fully delete old sub-index at: " + headDirOld);
+ }
+ for(MG4JIndex aSubIndex : indexesToMerge) {
+ if(!aSubIndex.indexDir.equals(headDir)) {
+ if(!gate.util.Files.rmdir(aSubIndex.indexDir)){
+ throw new IndexException(
+ "Could not fully delete old sub-index at: " +
+ aSubIndex.indexDir);
+ }
+ }
+ }
+ } else {
+ throw new IndexException("Cold not rename new head at " +
+ headDirNew.getAbsolutePath() + " to " + headDir);
+ }
+ } else {
+ throw new IndexException("Cold not rename head at " +
+ headDir.getAbsolutePath() + " to " + headDirOld);
+ }
+ }
}
- public IndexReader getIndexReader() {
- // TODO
- return null;
+ /**
+ * Instructs this index to dump to disk all the in-RAM index data at
the fist
+ * opportunity.
+ */
+ public void requestDumpToDisk() {
+ tailWriteRequested = true;
}
-
+ public void requestIndexCompression() {
+ indexCompressionRequested = true;
+ }
+
/**
+ * Opens one sub-index, specified as a directory inside this Atom
Index's
+ * index directory.
+ * @param indexDir
+ * @return
+ * @throws IOException
+ * @throws IndexException
+ */
+ protected MG4JIndex openSubIndex(String subIndexDirname) throws
IOException, IndexException {
+ Index newIndex = null;
+ File subIndexDir = new File(indexDirectory, subIndexDirname);
+ String mg4jBasename = new File(subIndexDir, name).getAbsolutePath();
+ try {
+ try{
+ newIndex = Index.getInstance(mg4jBasename + "?" +
+ UriKeys.MAPPED.name().toLowerCase() + "=1;");
+ } catch(IOException e) {
+ // memory mapping failed
+ logger.info("Memory mapping failed for index " + mg4jBasename
+ + ". Loading as file index instead");
+ // now try to just open it as an on-disk index
+ newIndex = Index.getInstance(mg4jBasename, true, true);
+ }
+ } catch(ConfigurationException | ClassNotFoundException | SecurityException
+ | InstantiationException | IllegalAccessException
+ | InvocationTargetException | NoSuchMethodException
+ | URISyntaxException e) {
+ throw new IndexException("Could not open the sub-index at" +
+ mg4jBasename , e);
+ }
+ File bloomFile = new File(mg4jBasename +
DocumentalCluster.BLOOM_EXTENSION);
+ BloomFilter<Void> termFilter = null;
+ try {
+ if(bloomFile.exists()) {
+ termFilter = (BloomFilter<Void>) BinIO.loadObject(bloomFile);
+ }
+ } catch(ClassNotFoundException e) {
+ // this should never happen. If it does, it's not fatal
+ logger.warn("Exception wile loading stre Bloom Filter", e);
+ }
+ MG4JIndex newIndexData = new MG4JIndex(newIndex, subIndexDir, termFilter,
+ newIndex.numberOfDocuments,
+ newIndex.numberOfTerms,
+ newIndex.numberOfPostings,
+ newIndex.numberOfOccurrences,
+ newIndex.maxCount);
+ return newIndexData;
+ }
+
+ /**
* Runnable implementation: the logic of this run method is simply
indexing
* documents queued to the input queue. To stop it, send a
* {@link GATEDocument#END_OF_QUEUE} value to the input queue.
@@ -797,6 +958,13 @@
writeCurrentTail();
}
outputQueue.put(aDocument);
+
+ if(indexCompressionRequested) {
+ compressIndex();
+ }
+ if(tailWriteRequested) {
+ writeCurrentTail();
+ }
}
// we're done
writeCurrentTail();
@@ -956,14 +1124,6 @@
this.occurrencesPerBatch = occurrencesPerBatch;
}
- public boolean isHasDirectIndex() {
- return hasDirectIndex;
- }
-
- public void setHasDirectIndex(boolean hasDirectIndex) {
- this.hasDirectIndex = hasDirectIndex;
- }
-
public File getIndexDirectory() {
return indexDirectory;
}
Modified:
mimir/branches/5.0/mimir-core/src/gate/mimir/index/mg4j/MimirIndexBuilder.java
===================================================================
---
mimir/branches/5.0/mimir-core/src/gate/mimir/index/mg4j/MimirIndexBuilder.java
2013-12-23 16:18:08 UTC (rev 17205)
+++
mimir/branches/5.0/mimir-core/src/gate/mimir/index/mg4j/MimirIndexBuilder.java
2013-12-24 16:30:52 UTC (rev 17206)
@@ -996,7 +996,7 @@
combineBatches(inputBasename, getGlobalFile("").getAbsolutePath());
// save the termMap
AtomicIndex.generateTermMap(getGlobalFile(DiskBasedIndex.TERMS_EXTENSION),
- getGlobalFile(DiskBasedIndex.TERMMAP_EXTENSION));
+ getGlobalFile(DiskBasedIndex.TERMMAP_EXTENSION), null);
// closing completed
closingProgress = 1;
Modified: mimir/branches/5.0/mimir-core/src/gate/mimir/util/MG4JTools.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/util/MG4JTools.java
2013-12-23 16:18:08 UTC (rev 17205)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/util/MG4JTools.java
2013-12-24 16:30:52 UTC (rev 17206)
@@ -66,7 +66,7 @@
// and generate the new one
File termsFile = new File(URI.create(indexUri.toString()
+ DiskBasedIndex.TERMS_EXTENSION));
- AtomicIndex.generateTermMap(termsFile, termMapFile);
+ AtomicIndex.generateTermMap(termsFile, termMapFile, null);
} else {
throw new IOException("Could not rename old termmap file (" +
termMapFile.getAbsolutePath() + ").");
Modified: mimir/branches/5.0/mimir-test/src/gate/mimir/test/Scratch.java
===================================================================
--- mimir/branches/5.0/mimir-test/src/gate/mimir/test/Scratch.java
2013-12-23 16:18:08 UTC (rev 17205)
+++ mimir/branches/5.0/mimir-test/src/gate/mimir/test/Scratch.java
2013-12-24 16:30:52 UTC (rev 17206)
@@ -265,6 +265,28 @@
"=================================\n");
}
+ // compress the index
+ ati.requestIndexCompression();
+ System.out.println("=================================\n" +
+ "Compressing index.\n" +
+ "=================================\n");
+ Thread.sleep(5000);
+
+ // and search one last time
+ Index index = ati.getIndex();
+ int resDocs = 0;
+ if(index != null) {
+ IndexIterator indexIter = index.getReader().documents("patent");
+ long docId = indexIter.nextDocument();
+ while(docId != DocumentIterator.END_OF_LIST) {
+ resDocs ++;
+ docId = indexIter.nextDocument();
+ }
+ }
+ System.out.println("=================================\n" +
+ "Matched " + resDocs + " documents.\n" +
+ "=================================\n");
+
ati.close();
}
This was sent by the SourceForge.net collaborative development platform, the
world's largest Open Source development site.
------------------------------------------------------------------------------
Rapidly troubleshoot problems before they affect your business. Most IT
organizations don't have a clear picture of how application performance
affects their revenue. With AppDynamics, you get 100% visibility into your
Java,.NET, & PHP application. Start your 15-day FREE TRIAL of AppDynamics Pro!
http://pubads.g.doubleclick.net/gampad/clk?id=84349831&iu=/4140/ostg.clktrk
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs