Revision: 17206
          http://sourceforge.net/p/gate/code/17206
Author:   valyt
Date:     2013-12-24 16:30:52 +0000 (Tue, 24 Dec 2013)
Log Message:
-----------
First sketch implementation for live-compressing an index while it's indexing 
(and service searches), all at the same time.

Modified Paths:
--------------
    mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
    
mimir/branches/5.0/mimir-core/src/gate/mimir/index/mg4j/MimirIndexBuilder.java
    mimir/branches/5.0/mimir-core/src/gate/mimir/util/MG4JTools.java
    mimir/branches/5.0/mimir-test/src/gate/mimir/test/Scratch.java

Modified: mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java 
2013-12-23 16:18:08 UTC (rev 17205)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java 
2013-12-24 16:30:52 UTC (rev 17206)
@@ -19,6 +19,7 @@
 import gate.mimir.MimirIndex;
 import gate.mimir.index.mg4j.GATEDocument;
 import gate.util.GateRuntimeException;
+import it.unimi.di.big.mg4j.index.BitStreamIndex;
 import it.unimi.di.big.mg4j.index.CompressionFlags;
 import it.unimi.di.big.mg4j.index.DiskBasedIndex;
 import it.unimi.di.big.mg4j.index.Index;
@@ -26,14 +27,21 @@
 import it.unimi.di.big.mg4j.index.IndexWriter;
 import it.unimi.di.big.mg4j.index.QuasiSuccinctIndex;
 import it.unimi.di.big.mg4j.index.QuasiSuccinctIndexWriter;
+import it.unimi.di.big.mg4j.index.SkipBitStreamIndexWriter;
 import it.unimi.di.big.mg4j.index.TermProcessor;
+import it.unimi.di.big.mg4j.index.CompressionFlags.Coding;
+import it.unimi.di.big.mg4j.index.CompressionFlags.Component;
 import it.unimi.di.big.mg4j.index.Index.UriKeys;
 import it.unimi.di.big.mg4j.index.cluster.ContiguousDocumentalStrategy;
 import it.unimi.di.big.mg4j.index.cluster.DocumentalCluster;
 import it.unimi.di.big.mg4j.index.cluster.DocumentalConcatenatedCluster;
 import it.unimi.di.big.mg4j.io.IOFactory;
+import it.unimi.di.big.mg4j.tool.Combine;
+import it.unimi.di.big.mg4j.tool.Concatenate;
 import it.unimi.di.big.mg4j.tool.Scan;
+import it.unimi.di.big.mg4j.tool.Combine.IndexType;
 import it.unimi.dsi.big.io.FileLinesCollection;
+import it.unimi.dsi.big.io.FileLinesCollection.FileLinesIterator;
 import it.unimi.dsi.big.util.ShiftAddXorSignedStringMap;
 import it.unimi.dsi.big.util.StringMap;
 import it.unimi.dsi.bits.Fast;
@@ -49,6 +57,7 @@
 import it.unimi.dsi.fastutil.objects.Object2ReferenceOpenHashMap;
 import it.unimi.dsi.io.OutputBitStream;
 import it.unimi.dsi.lang.MutableString;
+import it.unimi.dsi.logging.ProgressLogger;
 import it.unimi.dsi.sux4j.mph.LcpMonotoneMinimalPerfectHashFunction;
 import it.unimi.dsi.util.BloomFilter;
 import it.unimi.dsi.util.Properties;
@@ -64,6 +73,7 @@
 import java.nio.ByteOrder;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 
 import org.apache.commons.configuration.ConfigurationException;
@@ -259,14 +269,14 @@
     protected File indexDir;
     protected Index index;
     protected BloomFilter<Void> termFilter;
-    int numberOfDocuments;
-    int numberOfTerms;
+    long numberOfDocuments;
+    long numberOfTerms;
     long numberOfPostings;
     long numberOfOccurences;
     int maxCount;
     public MG4JIndex(Index index, File indexDir, 
         BloomFilter<Void> termFilter,
-        int numberOfDocuments, int numberOfTerms, long numberOfPostings,
+        long numberOfDocuments, long numberOfTerms, long numberOfPostings,
         long numberOfOccurences, int maxCount) {
       super();
       this.index = index;
@@ -283,18 +293,30 @@
   /**
    * Given a terms file (text file with one term per line) this method 
generates
    * the corresponding termmap file (binary representation of a StringMap).
+   * Optionally, a {@link BloomFilter} can also be generated, if the suitable 
+   * target file is provided.
    * @param termsFile the input file
    * @param termmapFile the output file
+   * @param bloomFilterFile if not null, the file to be used for writing
+   * the {@link BloomFilter} for the index.
    * @throws IOException 
    */
-  public static void generateTermMap(File termsFile, File termmapFile) throws 
IOException {
+  public static void generateTermMap(File termsFile, File termmapFile, 
+      File bloomFilterFile) throws IOException {
     FileLinesCollection fileLinesCollection =
         new FileLinesCollection(termsFile.getAbsolutePath(), "UTF-8");
       StringMap<CharSequence> terms = new ShiftAddXorSignedStringMap(
         fileLinesCollection.iterator(),
         new LcpMonotoneMinimalPerfectHashFunction<CharSequence>(
           fileLinesCollection, TransformationStrategies.prefixFreeUtf16()));
-      BinIO.storeObject(terms, termmapFile);    
+      BinIO.storeObject(terms, termmapFile);
+      if(bloomFilterFile != null) {
+        BloomFilter<Void> bloomFilter = BloomFilter.create(terms.size64());
+        for(MutableString term : fileLinesCollection) {
+          bloomFilter.add(term);
+        }
+        BinIO.storeObject(bloomFilter, bloomFilterFile);
+      }
   }  
 
   /**
@@ -495,8 +517,23 @@
    */
   protected IntArrayList documentSizesInRAM;
   
+  /**
+   * If a request was made to compress the index (combine all sub-indexes 
+   * into a new head) this flag will be set to true. The operation will be 
+   * performed on the indexing thread at the first opportunity. At that point 
+   * the flag will be reset to false.
+   */
+  protected volatile boolean indexCompressionRequested = false;
   
   /**
+   * If a request was made to write the in-RAM index data to disk this flag 
+   * will be set to true. The operation will be performed on the indexing
+   * thread at the first opportunity.  At that point the flag will be reset to 
+   * false.  
+   */
+  protected volatile boolean tailWriteRequested = false;
+  
+  /**
    * Creates a new AtomicIndex
    * 
    * @param parent the {@link MimirIndex} containing this atomic index.
@@ -538,7 +575,6 @@
     } else {
       // new index creation
       indexDirectory.mkdirs();
-      documentPointer = 0;
       subIndexes = new ArrayList<AtomicIndex.MG4JIndex>();
     }
     indexCluster = openIndexCluster(subIndexes, termProcessor);
@@ -560,16 +596,7 @@
        public boolean hasDirectIndex(){
          return hasDirectIndex;
        }
-       
-       public void indexDocument(Document document) {
-         //TODO
-         
-         // write to documents queue
-         
-         // convert to index data in RAM
-         
-       }
-       
+               
        /**
         * Starts a new MG4J batch. First time around this will be the head, 
         * subsequent calls will start a new tail.
@@ -671,7 +698,7 @@
     }
     pw.close();
     generateTermMap(new File(mg4jBasename + DiskBasedIndex.TERMS_EXTENSION),
-        new File(mg4jBasename + DiskBasedIndex.TERMMAP_EXTENSION));
+        new File(mg4jBasename + DiskBasedIndex.TERMMAP_EXTENSION), null);
     // write the bloom filter
     BinIO.storeObject(termFilter, 
         new File(mg4jBasename + DocumentalCluster.BLOOM_EXTENSION)); 
@@ -739,9 +766,12 @@
     }
     MG4JIndex newIndexData = new MG4JIndex(newIndex, newTailDir, termFilter, 
         documentsInRAM, numTermsInRAM, postingsInRam, occurrencesInRAM, 
maxCount);
-    subIndexes.add(newIndexData);
-    indexCluster = openIndexCluster(subIndexes, termProcessor);
-    
+    // modify internal state
+    synchronized(this) {
+      subIndexes.add(newIndexData);
+      indexCluster = openIndexCluster(subIndexes, termProcessor);      
+    }
+
          if(hasDirectIndex) {
            // dump new direct tail (invert the tail just written)
            // merge new direct tail into direct index cluster
@@ -754,27 +784,158 @@
        }
        
        /**
-        * Combines all the currently existing tails into the head, generating 
a new
+        * Combines all the currently existing sub-indexes, generating a new
         * head index.
+        * @throws IndexException 
+        * @throws IOException 
         */
-       public void combineTails() {
-         // TODO
+       protected void compressIndex() throws IndexException, IOException {
+         File headDirNew = new File(indexDirectory, HEAD_FILE_NAME + 
HEAD_NEW_EXT);
+         // make a local copy of the sub-indexes
+         List<MG4JIndex> indexesToMerge = 
+             new ArrayList<AtomicIndex.MG4JIndex>(subIndexes);
+         if(!headDirNew.mkdir()) {
+           throw new IndexException("Could not create new head directory at " 
+ 
+               headDirNew.getAbsolutePath() +  "!"); 
+         }
          
-         // create new head directory
          
-         // start combining the head, and each of the tails, writing out to 
the 
-         // new head dir.
+         Map<Component,Coding> codingFlags = 
+             CompressionFlags.DEFAULT_QUASI_SUCCINCT_INDEX;
+         String outputBaseName = new File(headDirNew, name).getAbsolutePath();
          
+         String[] inputBaseNames = new String[indexesToMerge.size()];
+         for(int i = 0; i < inputBaseNames.length; i++) {
+           inputBaseNames[i] = new File(indexesToMerge.get(i).indexDir, name)
+             .getAbsolutePath(); 
+         }
          
+         try {
+      new Concatenate(
+          IOFactory.FILESYSTEM_FACTORY,
+          outputBaseName,
+          inputBaseNames,
+          false, // metadataOnly 
+          Combine.DEFAULT_BUFFER_SIZE, 
+          codingFlags,
+          IndexType.QUASI_SUCCINCT,
+          true, // skips
+          // BitStreamIndex.DEFAULT_QUANTUM,
+          // replaced with optimised automatic calculation
+          -5, 
+          BitStreamIndex.DEFAULT_HEIGHT, 
+          SkipBitStreamIndexWriter.DEFAULT_TEMP_BUFFER_SIZE, 
+          ProgressLogger.DEFAULT_LOG_INTERVAL).run();
+      // generate term map
+      generateTermMap(new File(outputBaseName + 
DiskBasedIndex.TERMS_EXTENSION), 
+          new File(outputBaseName +  DiskBasedIndex.TERMMAP_EXTENSION),
+          new File(outputBaseName +  DocumentalCluster.BLOOM_EXTENSION));
+      
+      
+      
+    } catch(Exception e) {
+      throw new IndexException("Exception while combining sub-indexes", e);
+    }
+
+         // update the internal state
+    synchronized(this) {
+      // remove the indexes that were merged
+      subIndexes.removeAll(indexesToMerge);
+      // insert the new head at the front of the list
+      File headDir = new File(indexDirectory, HEAD_FILE_NAME);
+      File headDirOld = new File(indexDirectory, HEAD_FILE_NAME + 
HEAD_OLD_EXT);
+      if(headDir.exists() && headDir.renameTo(headDirOld)){
+        if(headDirNew.renameTo(headDir)) {
+          subIndexes.add(0, openSubIndex(HEAD_FILE_NAME));
+          indexCluster = openIndexCluster(subIndexes, termProcessor);
+          
+          // clean-up: delete old head, used-up tails
+          if(!gate.util.Files.rmdir(headDirOld)) {
+            throw new IndexException(
+                "Could not fully delete old sub-index at: " + headDirOld);
+          }
+          for(MG4JIndex aSubIndex : indexesToMerge) {
+            if(!aSubIndex.indexDir.equals(headDir)) {
+              if(!gate.util.Files.rmdir(aSubIndex.indexDir)){
+                throw new IndexException(
+                    "Could not fully delete old sub-index at: " + 
+                    aSubIndex.indexDir);
+              }              
+            }
+          }
+        } else {
+          throw new IndexException("Cold not rename new head at " + 
+              headDirNew.getAbsolutePath() + " to " + headDir);
+        }
+      } else {
+        throw new IndexException("Cold not rename head at " + 
+            headDir.getAbsolutePath() + " to " + headDirOld);
+      }
+    }
        }
        
-       public IndexReader getIndexReader() {
-         // TODO
-         return null;
+       /**
+        * Instructs this index to dump to disk all the in-RAM index data at 
the fist 
+        * opportunity.
+        */
+       public void requestDumpToDisk() {
+         tailWriteRequested = true;
        }
-
        
+       public void requestIndexCompression() {
+        indexCompressionRequested = true;
+       }
+       
        /**
+        * Opens one sub-index, specified as a directory inside this Atom 
Index's
+        * index directory.
+        * @param indexDir
+        * @return
+        * @throws IOException 
+        * @throws IndexException 
+        */
+       protected MG4JIndex openSubIndex(String subIndexDirname) throws 
IOException, IndexException {
+    Index newIndex = null;
+    File subIndexDir = new File(indexDirectory, subIndexDirname);
+    String mg4jBasename = new File(subIndexDir, name).getAbsolutePath(); 
+    try {
+      try{
+        newIndex = Index.getInstance(mg4jBasename + "?" +
+            UriKeys.MAPPED.name().toLowerCase() + "=1;");
+      } catch(IOException e) {
+        // memory mapping failed
+        logger.info("Memory mapping failed for index " + mg4jBasename
+                + ". Loading as file index instead");
+        // now try to just open it as an on-disk index
+        newIndex = Index.getInstance(mg4jBasename, true, true);
+      }
+    } catch(ConfigurationException | ClassNotFoundException | SecurityException
+        | InstantiationException | IllegalAccessException
+        | InvocationTargetException | NoSuchMethodException
+        | URISyntaxException e) {
+      throw new IndexException("Could not open the sub-index at" +
+         mg4jBasename , e);
+    }
+    File bloomFile = new File(mg4jBasename + 
DocumentalCluster.BLOOM_EXTENSION);
+    BloomFilter<Void> termFilter = null;
+    try {
+      if(bloomFile.exists()) {
+        termFilter = (BloomFilter<Void>) BinIO.loadObject(bloomFile);
+      }
+    } catch(ClassNotFoundException e) {
+      // this should never happen. If it does, it's not fatal
+      logger.warn("Exception wile loading stre Bloom Filter", e);
+    }
+    MG4JIndex newIndexData = new MG4JIndex(newIndex, subIndexDir, termFilter, 
+        newIndex.numberOfDocuments,
+        newIndex.numberOfTerms,
+        newIndex.numberOfPostings,
+        newIndex.numberOfOccurrences,
+        newIndex.maxCount);
+         return newIndexData;
+       }
+       
+       /**
         * Runnable implementation: the logic of this run method is simply 
indexing
         * documents queued to the input queue. To stop it, send a 
         * {@link GATEDocument#END_OF_QUEUE} value to the input queue.
@@ -797,6 +958,13 @@
             writeCurrentTail();
           }
           outputQueue.put(aDocument);
+          
+          if(indexCompressionRequested) {
+            compressIndex();
+          }
+          if(tailWriteRequested) {
+            writeCurrentTail();
+          }
         }
         // we're done
         writeCurrentTail();
@@ -956,14 +1124,6 @@
     this.occurrencesPerBatch = occurrencesPerBatch;
   }
 
-  public boolean isHasDirectIndex() {
-    return hasDirectIndex;
-  }
-
-  public void setHasDirectIndex(boolean hasDirectIndex) {
-    this.hasDirectIndex = hasDirectIndex;
-  }
-
   public File getIndexDirectory() {
     return indexDirectory;
   }

Modified: 
mimir/branches/5.0/mimir-core/src/gate/mimir/index/mg4j/MimirIndexBuilder.java
===================================================================
--- 
mimir/branches/5.0/mimir-core/src/gate/mimir/index/mg4j/MimirIndexBuilder.java  
    2013-12-23 16:18:08 UTC (rev 17205)
+++ 
mimir/branches/5.0/mimir-core/src/gate/mimir/index/mg4j/MimirIndexBuilder.java  
    2013-12-24 16:30:52 UTC (rev 17206)
@@ -996,7 +996,7 @@
       combineBatches(inputBasename, getGlobalFile("").getAbsolutePath());
       // save the termMap
       
AtomicIndex.generateTermMap(getGlobalFile(DiskBasedIndex.TERMS_EXTENSION), 
-        getGlobalFile(DiskBasedIndex.TERMMAP_EXTENSION));
+        getGlobalFile(DiskBasedIndex.TERMMAP_EXTENSION), null);
       // closing completed
       closingProgress = 1;
       

Modified: mimir/branches/5.0/mimir-core/src/gate/mimir/util/MG4JTools.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/util/MG4JTools.java    
2013-12-23 16:18:08 UTC (rev 17205)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/util/MG4JTools.java    
2013-12-24 16:30:52 UTC (rev 17206)
@@ -66,7 +66,7 @@
         // and generate the new one
         File termsFile = new File(URI.create(indexUri.toString()
           + DiskBasedIndex.TERMS_EXTENSION));
-        AtomicIndex.generateTermMap(termsFile, termMapFile);
+        AtomicIndex.generateTermMap(termsFile, termMapFile, null);
       } else {
         throw new IOException("Could not rename old termmap file (" + 
             termMapFile.getAbsolutePath() + ").");

Modified: mimir/branches/5.0/mimir-test/src/gate/mimir/test/Scratch.java
===================================================================
--- mimir/branches/5.0/mimir-test/src/gate/mimir/test/Scratch.java      
2013-12-23 16:18:08 UTC (rev 17205)
+++ mimir/branches/5.0/mimir-test/src/gate/mimir/test/Scratch.java      
2013-12-24 16:30:52 UTC (rev 17206)
@@ -265,6 +265,28 @@
           "=================================\n");
     }
 
+    // compress the index
+    ati.requestIndexCompression();
+    System.out.println("=================================\n" + 
+        "Compressing index.\n" +
+        "=================================\n");
+    Thread.sleep(5000);
+    
+    // and search one last time
+    Index index = ati.getIndex();
+    int resDocs = 0;
+    if(index != null) {
+      IndexIterator indexIter = index.getReader().documents("patent");
+      long docId = indexIter.nextDocument();
+      while(docId != DocumentIterator.END_OF_LIST) {
+        resDocs ++;
+        docId = indexIter.nextDocument();
+      }
+    }
+    System.out.println("=================================\n" + 
+        "Matched " + resDocs + " documents.\n" +
+        "=================================\n");    
+    
     ati.close();
   }
   

This was sent by the SourceForge.net collaborative development platform, the 
world's largest Open Source development site.


------------------------------------------------------------------------------
Rapidly troubleshoot problems before they affect your business. Most IT 
organizations don't have a clear picture of how application performance 
affects their revenue. With AppDynamics, you get 100% visibility into your 
Java,.NET, & PHP application. Start your 15-day FREE TRIAL of AppDynamics Pro!
http://pubads.g.doubleclick.net/gampad/clk?id=84349831&iu=/4140/ostg.clktrk
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs

Reply via email to