Revision: 17251
          http://sourceforge.net/p/gate/code/17251
Author:   valyt
Date:     2014-01-28 16:57:52 +0000 (Tue, 28 Jan 2014)
Log Message:
-----------
Live-compactable document collection seems to work.

Modified Paths:
--------------
    mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
    
mimir/branches/5.0/mimir-core/src/gate/mimir/index/mg4j/zipcollection/DocumentCollection.java

Modified: mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java        
2014-01-28 12:43:58 UTC (rev 17250)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java        
2014-01-28 16:57:52 UTC (rev 17251)
@@ -46,9 +46,11 @@
 import java.util.TimerTask;
 import java.util.TreeSet;
 import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
+import java.util.zip.ZipException;
 
 import org.apache.log4j.Logger;
 
@@ -410,6 +412,9 @@
     return futures;
   }
   
+  public void compactDocumentCollection() throws ZipException, IOException, 
IndexException {
+    documentCollection.compact();
+  }
   
   /**
    * Notifies this index that more occurrences have been stored in RAM by one 
of

Modified: 
mimir/branches/5.0/mimir-core/src/gate/mimir/index/mg4j/zipcollection/DocumentCollection.java
===================================================================
--- 
mimir/branches/5.0/mimir-core/src/gate/mimir/index/mg4j/zipcollection/DocumentCollection.java
       2014-01-28 12:43:58 UTC (rev 17250)
+++ 
mimir/branches/5.0/mimir-core/src/gate/mimir/index/mg4j/zipcollection/DocumentCollection.java
       2014-01-28 16:57:52 UTC (rev 17251)
@@ -57,13 +57,7 @@
    */
   protected static final int DOCUMENT_DATA_CACHE_SIZE = 100;
   
-  /**
-   * The number of documents kept in memory until a new zip file is written. 
As 
-   * new documents are submitted, they get written to the currently open zip 
-   * file but they cannot be read from the file. To account for this, we keep 
-   * them in memory, in the {@link #inputBuffer} structure.
-   */
-  protected static final int INPUT_BUFFER_SIZE = 100;
+
   
   /**
    * Class representing one of the collection (zip) files.
@@ -90,7 +84,7 @@
      * </ul>   
      */
     protected static final Pattern MIMIR_COLLECTION_PATTERN = Pattern.compile(
-        "\\Q" + MIMIR_COLLECTION_BASENAME + "\\E((\\d+)(?:-([a-zA-Z]+))?)\\Q"+
+        "\\Q" + MIMIR_COLLECTION_BASENAME + 
"\\E((\\d+)(?:-([-0-9a-zA-Z]+))?)\\Q"+
         MIMIR_COLLECTION_EXTENSION + "\\E");
     
     protected static FilenameFilter FILENAME_FILTER = new FilenameFilter() {
@@ -100,6 +94,9 @@
       }
     };
     
+    
+    protected File file;
+    
          protected ZipFile zipFile;
          
          protected long firstEntry;
@@ -147,8 +144,9 @@
     }
     
          public CollectionFile(File file) throws ZipException, IOException {
-           
-      zipFile = new ZipFile(file);      
+           this.file = file;
+      zipFile = new ZipFile(file);
+      collectionFileNumber = getCollectionFileNumber(file.getName());
       Enumeration<? extends ZipEntry> entries = zipFile.entries();
       firstEntry = Long.MAX_VALUE;
       lastEntry = -1;
@@ -209,8 +207,23 @@
    * Class that handles the creation of collection files.
    */
   protected static class CollectionFileWriter {
-
     /**
+     * The number of documents kept in memory until a new zip file is written. 
As 
+     * new documents are submitted, they get written to the currently open zip 
+     * file but they cannot be read from the file. To account for this, we 
keep 
+     * them in memory, in the {@link #inputBuffer} structure.
+     */
+    protected static final int INPUT_BUFFER_SIZE = 10;
+    
+    
+    /**
+     * Document data objects that have been written to the zip file currently 
+     * being created and have to be kept in RAM until the file is closed and 
can 
+     * be open in read mode. 
+     */
+    protected Long2ObjectLinkedOpenHashMap<DocumentData> inputBuffer;
+    
+    /**
      * The zip file managed by this collection.
      */
     protected ZipOutputStream zipOuputStream;
@@ -251,6 +264,7 @@
       }
       currentEntries = 0;
       currentLength = 0;
+      inputBuffer = new Long2ObjectLinkedOpenHashMap<DocumentData>();
     }
     
     /**
@@ -262,7 +276,7 @@
      * 
      * @throws IOException
      */
-    public boolean writeDocumentData(String entryName, DocumentData document) 
throws IOException {
+    public boolean writeDocumentData(long documentId, DocumentData document) 
throws IOException {
       //write the new document to the byte array
       ObjectOutputStream objectOutStream = new ObjectOutputStream(byteArrayOS);
       objectOutStream.writeObject(document);
@@ -270,10 +284,11 @@
 
       // check if this will take us over size
       if(currentLength + byteArrayOS.size() > ZIP_FILE_MAX_SIZE ||
-         currentEntries >= ZIP_FILE_MAX_ENTRIES) return false;
+         currentEntries >= ZIP_FILE_MAX_ENTRIES ||
+         inputBuffer.size() >= INPUT_BUFFER_SIZE) return false;
       
       // create a new entry in the current zip file
-      ZipEntry entry = new ZipEntry(entryName);
+      ZipEntry entry = new ZipEntry(Long.toString(documentId));
       zipOuputStream.putNextEntry(entry);
       //write the data
       byteArrayOS.writeTo(zipOuputStream);
@@ -283,11 +298,14 @@
       //clean up the byte array for next time
       byteArrayOS.reset();
       currentEntries++;
+      // save the document data to the input buffer
+      inputBuffer.put(documentId, document);
       return true;
     }
 
     public void close() throws IOException {
       if(zipOuputStream != null) zipOuputStream.close();
+      inputBuffer.clear();
     }
   }
   
@@ -312,12 +330,7 @@
    */
   protected Long2ObjectLinkedOpenHashMap<DocumentData> documentCache;
   
-  /**
-   * Document data objects that have been written to the zip file currently 
-   * being created and have to be kept in RAM until the file is closed and can 
-   * be open in read mode. 
-   */
-  protected Long2ObjectLinkedOpenHashMap<DocumentData> inputBuffer;
+
   
   /**
    * Flag that gets set to true when the collection is closed (and blocks all 
@@ -383,7 +396,7 @@
     // prepare for writing
     nextDocumentId = collectionFiles.isEmpty() ? 0 : 
         (collectionFiles.get(collectionFiles.size() - 1).lastEntry + 1);
-    inputBuffer = new Long2ObjectLinkedOpenHashMap<DocumentData>();
+    
   }
   
 
@@ -401,23 +414,27 @@
     if(collectionFiles.isEmpty() ||
        documentID > collectionFiles.get(collectionFiles.size() - 1).lastEntry) 
{
       // it's a new document that's not yet available from the zip files
-      documentData = inputBuffer.get(documentID);
+      documentData = collectionFileWriter != null ?
+          collectionFileWriter.inputBuffer.get(documentID):
+          null;
     } else {
       // it's an old document. Try the cache first
       documentData = documentCache.getAndMoveToFirst(documentID);
       if(documentData == null) {
         // cache miss: we need to actually load it
         //locate the right zip file
-        files: for(CollectionFile aColFile : collectionFiles) {
-          if(aColFile.containsDocument(documentID)) {
-            // we found the collection file containing the document
-            documentData = aColFile.getDocumentData(nextDocumentId);
-            documentCache.putAndMoveToFirst(documentID, documentData);
-            if(documentCache.size() > DOCUMENT_DATA_CACHE_SIZE) {
-              documentCache.removeLast();
+        synchronized(collectionFiles) {
+          files: for(CollectionFile aColFile : collectionFiles) {
+            if(aColFile.containsDocument(documentID)) {
+              // we found the collection file containing the document
+              documentData = aColFile.getDocumentData(nextDocumentId);
+              documentCache.putAndMoveToFirst(documentID, documentData);
+              if(documentCache.size() > DOCUMENT_DATA_CACHE_SIZE) {
+                documentCache.removeLast();
+              }
+              break files;
             }
-            break files;
-          }
+          } 
         }
       }
     }
@@ -440,15 +457,15 @@
     try{
       boolean success = false;
       while(!success) {
-        success = collectionFileWriter.writeDocumentData(
-            Long.toString(nextDocumentId), document);
+        success = collectionFileWriter.writeDocumentData(nextDocumentId, 
+            document);
         if(!success) {
           // the current collection file is full: close it
           collectionFileWriter.close();
           synchronized(collectionFiles) {
             // open the newly saved zip file
             collectionFiles.add(new 
CollectionFile(collectionFileWriter.zipFile));
-            inputBuffer.clear();
+            
           }
           // open a new one and try again
           openCollectionWriter();
@@ -457,8 +474,6 @@
     } catch(IOException e){
       throw new IndexException("Problem while accessing the collection file", 
e);
     } finally {
-      // save the document data to the input buffer
-      inputBuffer.put(nextDocumentId, document);
       nextDocumentId++;
     }
   }
@@ -506,63 +521,118 @@
     documentCache.clear();
   }
   
-  
-  protected void compact() throws ZipException, IOException {
-    
-    ZipOutputStream outputStream = null;
-    long outFileSize = 0;
-    int outFileEntries = 0;
-    for(File inputFile : 
indexDirectory.listFiles(CollectionFile.FILENAME_FILTER)) {
-      ZipFile inputZipFile = new ZipFile(inputFile);
-      if(outputStream == null) {
-        // we're not currently writing because all files so far have been OK
-        if(inputZipFile.size() < ZIP_FILE_MAX_ENTRIES &&
-           inputFile.length() < ZIP_FILE_MAX_SIZE) {
-          // the current file is too small: we need to add more entries to it
-          
-          //this becomes the first out file
-          // TODO
-          //MKDIR out dir
-          // mv file to out
-          // open file for writing
-          outFileEntries = inputZipFile.size();
-          inputZipFile.close();
-          outFileSize = inputFile.length();
-          outputStream = new ZipOutputStream(new BufferedOutputStream(
-              new  FileOutputStream(inputFile)));
-        }        
-      } else {
-        // we're currently writing to an output file: we need to copy all the
-        // entries in the new input file
-        Enumeration<? extends ZipEntry> inputEntries = inputZipFile.entries();
-        while(inputEntries.hasMoreElements()) {
-          if(outFileEntries > ZIP_FILE_MAX_ENTRIES ||
-              outFileSize > ZIP_FILE_MAX_SIZE) {
-            // we need to move on to the next zip output file
-            // TODO
+  /**
+   * Combines multiple smaller collection files into larger ones. If multiple
+   * consecutive collection files can be combined without exceeding the maximum
+   * permitted sizes ({@link #ZIP_FILE_MAX_ENTRIES} and 
+   * {@link #ZIP_FILE_MAX_SIZE}), then they are combined.
+   * 
+   * @throws ZipException
+   * @throws IOException
+   * @throws IndexException 
+   */
+  public void compact() throws ZipException, IOException, IndexException {
+    // find an interval of files that can be joined together
+    // we search from the end toward the start so that we can modify the 
+    // list without changing the yet-unvisited IDs.
+    CollectionFile[] colFilesArr = collectionFiles.toArray(
+        new CollectionFile[collectionFiles.size()]);
+    int intervalEnd = -1;
+    int intervalLength = 0;
+    int intervalEntries = 0;
+    long intervalBytes = 0;
+    for(int i = colFilesArr.length -1; i >= 0; i--) {
+      // is the current file small?
+      boolean smallFile = 
+          colFilesArr[i].documentCount < ZIP_FILE_MAX_ENTRIES &&
+          colFilesArr[i].length < ZIP_FILE_MAX_SIZE;
+      if(intervalEnd < 0) { // we're looking for the first 'small' file
+        if(smallFile) {
+          // we found a small file: start a new interval
+          intervalEnd = i;
+          intervalLength = 1;
+          intervalEntries = colFilesArr[i].documentCount;
+          intervalBytes = colFilesArr[i].length;
+        }
+      } else { // we're trying to extend the current interval
+        boolean currentFileAccepted = 
+            intervalEntries + colFilesArr[i].documentCount < 
ZIP_FILE_MAX_ENTRIES &&
+            intervalBytes + colFilesArr[i].length < ZIP_FILE_MAX_SIZE;
+        if(currentFileAccepted) {
+          // extend the current interval
+          intervalEntries += colFilesArr[i].documentCount;
+          intervalBytes += colFilesArr[i].length;
+          intervalLength = intervalEnd - i + 1;
+        }
+        if(!currentFileAccepted || i == 0) {
+          // end the current interval
+          if(intervalLength > 1){
+            int intervalStart = intervalEnd - intervalLength + 1;
+            // combine the files
+            // create the new file
+            String newFileName = "temp-" + 
+                CollectionFile.MIMIR_COLLECTION_BASENAME + 
+                intervalStart + "-" + intervalEnd + 
+                CollectionFile.MIMIR_COLLECTION_EXTENSION;
+            File newZipFile = new File(indexDirectory, newFileName);
+            ZipOutputStream  zos = new ZipOutputStream(new 
BufferedOutputStream(
+                      new  FileOutputStream(newZipFile)));
+            byte[] buff = new byte[1024 * 1024];
+            for(int j = intervalStart; j <= intervalEnd; j++) {
+              Enumeration<? extends ZipEntry> entries = 
+                  colFilesArr[j].zipFile.entries();
+              while(entries.hasMoreElements()) {
+                ZipEntry anEntry = entries.nextElement();
+                zos.putNextEntry(new ZipEntry(anEntry));
+                InputStream is = 
colFilesArr[j].zipFile.getInputStream(anEntry);
+                int read = is.read(buff);
+                while(read >= 0){
+                  zos.write(buff, 0, read);
+                  read = is.read(buff);
+                }
+                zos.closeEntry();
+              }
+            }
+            zos.close();
+            
+            //update the collection
+            synchronized(colFilesArr) {
+              //confirm that the collection files have not changed since we 
started
+              for(int j = intervalStart; j <= intervalEnd; j++) {
+                if(colFilesArr[j] != collectionFiles.get(j)) {
+                  logger.warn("Collection files have changed since the "
+                      + "compacting operation started. Compact aborted.");
+                  // delete the newly created collection file
+                  newZipFile.delete();
+                  return;
+                }
+              }
+              // replace the old files with the new one
+              File newCollectionFile = new File(indexDirectory, 
+                  CollectionFile.getCollectionFileName(
+                      Integer.toString(intervalStart) + "-" + 
+                      Integer.toString(intervalEnd)));
+              newZipFile.renameTo(newCollectionFile);
+              for(int j = intervalStart; j <= intervalEnd; j++) {
+                CollectionFile oldColFile = 
collectionFiles.remove(intervalStart);
+                if(!oldColFile.file.delete()) {
+                  throw new IndexException(
+                      "Could not delete old collection file " + 
+                      oldColFile.file + "! " + 
+                      "Document collection now inconsistent.");
+                }
+              }
+              collectionFiles.add(intervalStart, new 
CollectionFile(newCollectionFile));
+            }
+          } else {
+            // we only found an interval of length 1: start again
+            intervalEnd = -1;
+            intervalLength = 0;
+            intervalEntries = 0;
+            intervalBytes = 0;
           }
-          
-          
-          ZipEntry inputEntry = inputEntries.nextElement();
-          ZipEntry outputEntry = new ZipEntry(inputEntry);
-          outputStream.putNextEntry(outputEntry);
-          //write the data
-          byte[] buf = new byte[1024 * 1024];
-          InputStream is = inputZipFile.getInputStream(inputEntry);
-          int read = is.read(buf);
-          while(read >= 0) {
-            outputStream.write(buf, 0, read);
-            read = is.read(buf);
-          }
-          outputStream.closeEntry();
-          outFileSize += outputEntry.getCompressedSize();
-          outFileEntries++;
-          
-
         }
       }
-
-
     }
   }
 }
\ No newline at end of file

This was sent by the SourceForge.net collaborative development platform, the 
world's largest Open Source development site.


------------------------------------------------------------------------------
WatchGuard Dimension instantly turns raw network data into actionable 
security intelligence. It gives you real-time visual feedback on key
security issues and trends.  Skip the complicated setup - simply import
a virtual appliance and go from zero to informed in seconds.
http://pubads.g.doubleclick.net/gampad/clk?id=123612991&iu=/4140/ostg.clktrk
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs

Reply via email to