Revision: 17260
          http://sourceforge.net/p/gate/code/17260
Author:   valyt
Date:     2014-01-30 12:52:55 +0000 (Thu, 30 Jan 2014)
Log Message:
-----------
Implemented static method to combine a set of direct indexes (forming a lexical 
partition) into a single index. 

Modified Paths:
--------------
    mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java

Modified: mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java 
2014-01-29 18:29:53 UTC (rev 17259)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/index/AtomicIndex.java 
2014-01-30 12:52:55 UTC (rev 17260)
@@ -24,6 +24,7 @@
 import it.unimi.di.big.mg4j.index.CompressionFlags;
 import it.unimi.di.big.mg4j.index.DiskBasedIndex;
 import it.unimi.di.big.mg4j.index.Index;
+import it.unimi.di.big.mg4j.index.IndexIterator;
 import it.unimi.di.big.mg4j.index.IndexReader;
 import it.unimi.di.big.mg4j.index.IndexWriter;
 import it.unimi.di.big.mg4j.index.NullTermProcessor;
@@ -320,6 +321,25 @@
     }
     
     /**
+     * Empties all the data from this postings list making it ready to be 
reused.
+     */
+    public void clear() {
+      documentPointersDifferential.clear();
+      count = 0;
+      counts.clear();
+      maxCount = 0;
+      occurrences = 0;
+      if(positions != null){
+        positions.clear();
+        lastPosition = -1;
+        sumMaxPos = 0;
+      }
+      firstDocumentPointer = -1;
+      lastDocumentPointer = -1;
+      frequency = 0;
+    }
+    
+    /**
      * Writes the data contained in this postings list to an index writer
      * @param indexWriter
      * @throws IOException 
@@ -395,16 +415,18 @@
     protected Index directIndex;
     protected BloomFilter<Void> invertedTermFilter;
     protected BloomFilter<Void> directTermFilter;
-
+    protected String indexName;
     
     public MG4JIndex(
         File indexDir,
+        String indexName,
         Index invertedIndex,  
         BloomFilter<Void> invertedTermFilter,
         Index directIndex,
         BloomFilter<Void> directTermFilter) {
       
       this.indexDir = indexDir;
+      this.indexName = indexName;
       this.invertedIndex = invertedIndex;
       this.invertedTermFilter = invertedTermFilter;
       
@@ -525,7 +547,6 @@
     long numberOfOccurences =-1;
     int maxCount =-1;
     int indexIdx = 0;
-    IntBigList sizes = new IntBigArrayBigList();
     BloomFilter<Void> bloomFilters[] = new BloomFilter[indexes.length];
     
     for(MG4JIndex aSubIndex : subIndexes) {
@@ -545,7 +566,6 @@
         maxCount = aSubIndex.directIndex.maxCount;
       }
       bloomFilters[indexIdx] = aSubIndex.directTermFilter;
-      sizes.addAll(aSubIndex.directIndex.sizes);
       indexIdx++;
     }
     cutPointTerms[cutPointTerms.length - 1] = null;
@@ -563,7 +583,7 @@
           false, // hasPositions, 
           NullTermProcessor.getInstance(), 
           null, // field 
-          sizes, // sizes
+          null, // sizes
           null // properties
           );
   }  
@@ -754,12 +774,6 @@
   protected MutableString currentTerm;
   
   /**
-   * The current document pointer (gets incremented for each document). This is
-   * a global for all documents in the atomic index, in all the batches.
-   */
-  protected long documentPointer;
-  
-  /**
    * The number of documents currently stored in RAM.
    */
   protected int documentsInRAM;
@@ -922,10 +936,6 @@
          occurrencesInRAM = 0;
     maxDocSizeInRAM = -1;
     documentsInRAM = 0;
-    // hack to force zero-based batches. This forces us to use a merged cluster
-    // but avoids exceptions during indexing. We'll fix this if possible, 
after 
-    // we get advice on the MG4J mailing list.
-    documentPointer = 0;
     if(termMap == null) {
       termMap = new Object2ReferenceOpenHashMap<MutableString, 
           PostingsList>(INITIAL_TERM_MAP_SIZE, Hash.FAST_LOAD_FACTOR );      
@@ -1109,6 +1119,13 @@
     // which are actually document IDs, and they have posting lists containing
     // document IDs, which are actually termIDs.
 
+    // The document pointers in RAM are zero-based, so we need to add all the 
+    // documents on disk to this.
+    long docsOnDisk = 0;
+    for(MG4JIndex index : subIndexes) {
+      docsOnDisk += index.invertedIndex.numberOfDocuments;
+    }
+    
     //1. invert index data in RAM
     Object2ReferenceOpenHashMap<MutableString, PostingsList> docMap = 
           new Object2ReferenceOpenHashMap<MutableString, 
@@ -1126,14 +1143,13 @@
       }
     }
     // we now read the posting lists for all the terms, in ascending term 
order    
-    //for(int termId = 0; termId < termArray.length; termId++) {
     MutableString termMS = new MutableString();
     for(long directTermId = 0; directTermId < directTerms.size64(); 
directTermId++){
       String termString = directTerms.get(directTermId);
       termMS.replace(termString);
       PostingsList termPostings = termMap.get(termMS);
       if(termPostings != null) {
-        long docPointer = termPostings.firstDocumentPointer;
+        long docPointer = docsOnDisk + termPostings.firstDocumentPointer;
         for(int i = 0; i < termPostings.documentPointersDifferential.size(); 
i++) {
           docPointer += termPostings.documentPointersDifferential.get(i);      
 
           int count = termPostings.counts.getInt(i);
@@ -1277,8 +1293,9 @@
         * head index.
         * @throws IndexException 
         * @throws IOException 
+        * @throws ConfigurationException 
         */
-       protected void compactIndex() throws IndexException, IOException {
+       protected void compactIndex() throws IndexException, IOException, 
ConfigurationException {
          File headDirNew = new File(indexDirectory, HEAD_FILE_NAME + 
HEAD_NEW_EXT);
          // make a local copy of the sub-indexes
          List<MG4JIndex> indexesToMerge = 
@@ -1323,41 +1340,8 @@
     }
 
     if(hasDirectIndex()) {
-      codingFlags = new HashMap<Component, Coding>(
-          CompressionFlags.DEFAULT_QUASI_SUCCINCT_INDEX); 
-      codingFlags.remove(Component.POSITIONS);
-      outputBaseName = new File(headDirNew, name + 
-          DIRECT_INDEX_NAME_SUFFIX).getAbsolutePath();
-      
-      inputBaseNames = new String[indexesToMerge.size()];
-      for(int i = 0; i < inputBaseNames.length; i++) {
-        inputBaseNames[i] = new File(indexesToMerge.get(i).indexDir, 
-            name + DIRECT_INDEX_NAME_SUFFIX).getAbsolutePath(); 
-      }
-      try {
-        new Merge(
-            IOFactory.FILESYSTEM_FACTORY,
-            outputBaseName,
-            inputBaseNames,
-            false, // metadataOnly 
-            Combine.DEFAULT_BUFFER_SIZE, 
-            codingFlags,
-            IndexType.QUASI_SUCCINCT,
-            true, // skips
-            // BitStreamIndex.DEFAULT_QUANTUM,
-            // replaced with optimised automatic calculation
-            -5, 
-            BitStreamIndex.DEFAULT_HEIGHT, 
-            SkipBitStreamIndexWriter.DEFAULT_TEMP_BUFFER_SIZE, 
-            ProgressLogger.DEFAULT_LOG_INTERVAL).run();
-        // generate term map
-        generateTermMap(new File(outputBaseName + 
DiskBasedIndex.TERMS_EXTENSION), 
-            new File(outputBaseName +  DiskBasedIndex.TERMMAP_EXTENSION),
-            new File(outputBaseName +  DocumentalCluster.BLOOM_EXTENSION));
-      } catch(Exception e) {
-        throw new IndexException("Exception while combining direct 
sub-indexes", 
-            e);
-      }
+      combineDirectIndexes(indexesToMerge, new File(headDirNew, name + 
+          DIRECT_INDEX_NAME_SUFFIX).getAbsolutePath());
     }    
          
          // update the internal state
@@ -1406,6 +1390,109 @@
        }
        
        /**
+        * Given a set of direct indexes (MG4J indexes, with counts, but no 
positions,
+        * that form a lexical cluster) this method produces one single output 
index
+        * containing the data from all the input indexes.
+        * @param inputIndexes
+        * @param outputBasename
+        * @throws IOException 
+        * @throws ConfigurationException 
+        */
+       protected static void combineDirectIndexes (List<MG4JIndex> 
inputIndexes, 
+           String outputBasename) throws IOException, ConfigurationException {
+         
+         long noOfDocuments = 0;
+         long noOfTerms = 0;
+         for(MG4JIndex index : inputIndexes) {
+           noOfDocuments += index.directIndex.numberOfDocuments;
+           noOfTerms += index.directIndex.numberOfTerms;
+         }
+         
+         // open the output writer
+    // copy the default compression flags, and remove positions
+    Map<Component, Coding> flags = new HashMap<Component, Coding>(
+        CompressionFlags.DEFAULT_QUASI_SUCCINCT_INDEX);
+    flags.remove(Component.POSITIONS);
+    QuasiSuccinctIndexWriter outputIndexWriter =
+        new QuasiSuccinctIndexWriter(
+            IOFactory.FILESYSTEM_FACTORY,
+            outputBasename, 
+            noOfDocuments,
+            Fast.mostSignificantBit(QuasiSuccinctIndex.DEFAULT_QUANTUM),
+            QuasiSuccinctIndexWriter.DEFAULT_CACHE_SIZE,
+            flags,
+            ByteOrder.nativeOrder());
+    
+    BloomFilter<Void> bloomFilter = BloomFilter.create(noOfTerms);
+    PrintWriter termsPw = new PrintWriter( 
+        new OutputStreamWriter(new FastBufferedOutputStream(
+            new FileOutputStream(outputBasename + 
DiskBasedIndex.TERMS_EXTENSION), 
+            64 * 1024), 
+        "UTF-8" ));
+    
+    // write the index
+    long occurrences = 0;
+    int maxCount = 0;
+    PostingsList postingsList = new PostingsList(false);
+    for(MG4JIndex inputIndex : inputIndexes) {
+      IndexReader inputReader = inputIndex.directIndex.getReader();
+      File directTermsFile = new File(inputIndex.indexDir, 
+          inputIndex.indexName + DIRECT_INDEX_NAME_SUFFIX + 
+          DiskBasedIndex.TERMS_EXTENSION);
+      FileLinesCollection.FileLinesIterator termsIter =
+          new FileLinesCollection(directTermsFile.getAbsolutePath(), 
+          "UTF-8").iterator();
+      MutableString termMS = null;
+      IndexIterator inputIterator = inputReader.nextIterator();
+      while(inputIterator != null && termsIter.hasNext()) {
+        termMS = termsIter.next();
+        bloomFilter.add(termMS);
+        termMS.println(termsPw);
+        long docPointer = inputIterator.nextDocument();
+        while(docPointer !=  IndexIterator.END_OF_LIST) {
+          postingsList.newDocumentPointer(docPointer);
+          postingsList.setCount(inputIterator.count());
+          docPointer = inputIterator.nextDocument();
+        }
+        postingsList.flush();
+        occurrences += postingsList.occurrences;
+        if ( maxCount < postingsList.maxCount ) maxCount = 
postingsList.maxCount;
+        postingsList.write(outputIndexWriter);
+        postingsList.clear();
+        inputIterator = inputReader.nextIterator();
+      }
+      inputReader.close();
+    }
+    outputIndexWriter.close();
+    termsPw.close();
+    generateTermMap(new File(outputBasename + DiskBasedIndex.TERMS_EXTENSION),
+        new File(outputBasename + DiskBasedIndex.TERMMAP_EXTENSION), null);
+    // write the bloom filter
+    BinIO.storeObject(bloomFilter, 
+        new File(outputBasename + DocumentalCluster.BLOOM_EXTENSION));
+    // direct indexes don't store positions, so sizes are not needed
+
+    // write the index properties
+    Properties properties = outputIndexWriter.properties();
+    properties.setProperty(Index.PropertyKeys.TERMPROCESSOR, 
+        ObjectParser.toSpec(NullTermProcessor.getInstance()));
+    properties.setProperty( Index.PropertyKeys.SIZE,  
+        outputIndexWriter.writtenBits());
+    // -1 means unknown
+    properties.setProperty( Index.PropertyKeys.MAXDOCSIZE, -1);
+    properties.setProperty( Index.PropertyKeys.MAXCOUNT, maxCount );
+    properties.setProperty( Index.PropertyKeys.OCCURRENCES, occurrences);
+    Scan.saveProperties( IOFactory.FILESYSTEM_FACTORY, properties, 
+        outputBasename + DiskBasedIndex.PROPERTIES_EXTENSION );
+    
+    // write stats
+    PrintStream statsPs = new PrintStream(new File(outputBasename + 
+        DiskBasedIndex.STATS_EXTENSION));
+    outputIndexWriter.printStats(statsPs);
+    statsPs.close();
+       }
+       
+       /**
         * Instructs this index to dump to disk all the in-RAM index data at 
the fist 
         * opportunity.
         * @return a future value that, upon completion, will return the number 
of
@@ -1474,39 +1561,41 @@
       logger.warn("Exception wile loading stre Bloom Filter", e);
     }
     
-    // open direct index
     Index directIndex = null;
-    mg4jBasename = new File(subIndexDir, name + 
-        DIRECT_INDEX_NAME_SUFFIX).getAbsolutePath(); 
-    try {
-      try{
-        directIndex = Index.getInstance(
-            mg4jBasename + "?" + UriKeys.MAPPED.name().toLowerCase() + "=1;", 
-            true, true);
-      } catch(IOException e) {
-        // memory mapping failed
-        logger.info("Memory mapping failed for index " + mg4jBasename
-                + ". Loading as file index instead");
-        // now try to open it as a plain an on-disk index
-        directIndex = Index.getInstance(mg4jBasename, true, true);
-      }
-    } catch(Exception e) {
-      throw new IndexException("Could not open the sub-index at" + 
mg4jBasename , e);
-    }
-    //read the Bloom filter 
-    bloomFile = new File(mg4jBasename + DocumentalCluster.BLOOM_EXTENSION);
     BloomFilter<Void> directTermFilter = null;
-    try {
-      if(bloomFile.exists()) {
-        directTermFilter = (BloomFilter<Void>) BinIO.loadObject(bloomFile);
+    if(hasDirectIndex) {
+      // open direct index
+      mg4jBasename = new File(subIndexDir, name + 
+          DIRECT_INDEX_NAME_SUFFIX).getAbsolutePath();
+      try {
+        try{
+          directIndex = Index.getInstance(
+              mg4jBasename + "?" + UriKeys.MAPPED.name().toLowerCase() + 
"=1;", 
+              true, false);
+        } catch(IOException e) {
+          // memory mapping failed
+          logger.info("Memory mapping failed for index " + mg4jBasename
+                  + ". Loading as file index instead");
+          // now try to open it as a plain an on-disk index
+          directIndex = Index.getInstance(mg4jBasename, true, false);
+        }
+      } catch(Exception e) {
+        throw new IndexException("Could not open the sub-index at" + 
mg4jBasename , e);
       }
-    } catch(ClassNotFoundException e) {
-      // this should never happen. If it does, it's not fatal
-      logger.warn("Exception wile loading stre Bloom Filter", e);
+      //read the Bloom filter 
+      bloomFile = new File(mg4jBasename + DocumentalCluster.BLOOM_EXTENSION);
+      
+      try {
+        if(bloomFile.exists()) {
+          directTermFilter = (BloomFilter<Void>) BinIO.loadObject(bloomFile);
+        }
+      } catch(ClassNotFoundException e) {
+        // this should never happen. If it does, it's not fatal
+        logger.warn("Exception wile loading stre Bloom Filter", e);
+      }
     }
     
-    
-    MG4JIndex newIndexData = new MG4JIndex(subIndexDir, 
+    MG4JIndex newIndexData = new MG4JIndex(subIndexDir, name,
         invertedIndex, invertedTermFilter, 
         directIndex, directTermFilter);
          return newIndexData;
@@ -1664,7 +1753,6 @@
       documentSizesInRAM.add(docLength);
     } finally {
       documentEnding(gateDocument);
-      documentPointer++;
       documentsInRAM++;
     }
   }
@@ -1707,7 +1795,9 @@
       termMap.put( currentTerm.copy(), termPostings = new PostingsList(true));
     }
     //add the current posting to the current postings list
-    termPostings.newDocumentPointer(documentPointer);
+    // In a documental cluster, each sub-index is zero-based. This is why we 
use
+    // the local document pointer here.
+    termPostings.newDocumentPointer(documentsInRAM);
     //this is needed so that we don't increment the number of occurrences
     //for duplicate values.
     if(termPostings.checkPosition(tokenPosition)){

This was sent by the SourceForge.net collaborative development platform, the 
world's largest Open Source development site.


------------------------------------------------------------------------------
WatchGuard Dimension instantly turns raw network data into actionable 
security intelligence. It gives you real-time visual feedback on key
security issues and trends.  Skip the complicated setup - simply import
a virtual appliance and go from zero to informed in seconds.
http://pubads.g.doubleclick.net/gampad/clk?id=123612991&iu=/4140/ostg.clktrk
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs

Reply via email to