Revision: 17251
http://sourceforge.net/p/gate/code/17251
Author: valyt
Date: 2014-01-28 16:57:52 +0000 (Tue, 28 Jan 2014)
Log Message:
-----------
Live-compactable document collection seems to work.
Modified Paths:
--------------
mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
mimir/branches/5.0/mimir-core/src/gate/mimir/index/mg4j/zipcollection/DocumentCollection.java
Modified: mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
===================================================================
--- mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
2014-01-28 12:43:58 UTC (rev 17250)
+++ mimir/branches/5.0/mimir-core/src/gate/mimir/MimirIndex.java
2014-01-28 16:57:52 UTC (rev 17251)
@@ -46,9 +46,11 @@
import java.util.TimerTask;
import java.util.TreeSet;
import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
+import java.util.zip.ZipException;
import org.apache.log4j.Logger;
@@ -410,6 +412,9 @@
return futures;
}
+ public void compactDocumentCollection() throws ZipException, IOException,
IndexException {
+ documentCollection.compact();
+ }
/**
* Notifies this index that more occurrences have been stored in RAM by one
of
Modified:
mimir/branches/5.0/mimir-core/src/gate/mimir/index/mg4j/zipcollection/DocumentCollection.java
===================================================================
---
mimir/branches/5.0/mimir-core/src/gate/mimir/index/mg4j/zipcollection/DocumentCollection.java
2014-01-28 12:43:58 UTC (rev 17250)
+++
mimir/branches/5.0/mimir-core/src/gate/mimir/index/mg4j/zipcollection/DocumentCollection.java
2014-01-28 16:57:52 UTC (rev 17251)
@@ -57,13 +57,7 @@
*/
protected static final int DOCUMENT_DATA_CACHE_SIZE = 100;
- /**
- * The number of documents kept in memory until a new zip file is written.
As
- * new documents are submitted, they get written to the currently open zip
- * file but they cannot be read from the file. To account for this, we keep
- * them in memory, in the {@link #inputBuffer} structure.
- */
- protected static final int INPUT_BUFFER_SIZE = 100;
+
/**
* Class representing one of the collection (zip) files.
@@ -90,7 +84,7 @@
* </ul>
*/
protected static final Pattern MIMIR_COLLECTION_PATTERN = Pattern.compile(
- "\\Q" + MIMIR_COLLECTION_BASENAME + "\\E((\\d+)(?:-([a-zA-Z]+))?)\\Q"+
+ "\\Q" + MIMIR_COLLECTION_BASENAME +
"\\E((\\d+)(?:-([-0-9a-zA-Z]+))?)\\Q"+
MIMIR_COLLECTION_EXTENSION + "\\E");
protected static FilenameFilter FILENAME_FILTER = new FilenameFilter() {
@@ -100,6 +94,9 @@
}
};
+
+ protected File file;
+
protected ZipFile zipFile;
protected long firstEntry;
@@ -147,8 +144,9 @@
}
public CollectionFile(File file) throws ZipException, IOException {
-
- zipFile = new ZipFile(file);
+ this.file = file;
+ zipFile = new ZipFile(file);
+ collectionFileNumber = getCollectionFileNumber(file.getName());
Enumeration<? extends ZipEntry> entries = zipFile.entries();
firstEntry = Long.MAX_VALUE;
lastEntry = -1;
@@ -209,8 +207,23 @@
* Class that handles the creation of collection files.
*/
protected static class CollectionFileWriter {
-
/**
+ * The number of documents kept in memory until a new zip file is written.
As
+ * new documents are submitted, they get written to the currently open zip
+ * file but they cannot be read from the file. To account for this, we
keep
+ * them in memory, in the {@link #inputBuffer} structure.
+ */
+ protected static final int INPUT_BUFFER_SIZE = 10;
+
+
+ /**
+ * Document data objects that have been written to the zip file currently
+ * being created and have to be kept in RAM until the file is closed and
can
+ * be open in read mode.
+ */
+ protected Long2ObjectLinkedOpenHashMap<DocumentData> inputBuffer;
+
+ /**
* The zip file managed by this collection.
*/
protected ZipOutputStream zipOuputStream;
@@ -251,6 +264,7 @@
}
currentEntries = 0;
currentLength = 0;
+ inputBuffer = new Long2ObjectLinkedOpenHashMap<DocumentData>();
}
/**
@@ -262,7 +276,7 @@
*
* @throws IOException
*/
- public boolean writeDocumentData(String entryName, DocumentData document)
throws IOException {
+ public boolean writeDocumentData(long documentId, DocumentData document)
throws IOException {
//write the new document to the byte array
ObjectOutputStream objectOutStream = new ObjectOutputStream(byteArrayOS);
objectOutStream.writeObject(document);
@@ -270,10 +284,11 @@
// check if this will take us over size
if(currentLength + byteArrayOS.size() > ZIP_FILE_MAX_SIZE ||
- currentEntries >= ZIP_FILE_MAX_ENTRIES) return false;
+ currentEntries >= ZIP_FILE_MAX_ENTRIES ||
+ inputBuffer.size() >= INPUT_BUFFER_SIZE) return false;
// create a new entry in the current zip file
- ZipEntry entry = new ZipEntry(entryName);
+ ZipEntry entry = new ZipEntry(Long.toString(documentId));
zipOuputStream.putNextEntry(entry);
//write the data
byteArrayOS.writeTo(zipOuputStream);
@@ -283,11 +298,14 @@
//clean up the byte array for next time
byteArrayOS.reset();
currentEntries++;
+ // save the document data to the input buffer
+ inputBuffer.put(documentId, document);
return true;
}
public void close() throws IOException {
if(zipOuputStream != null) zipOuputStream.close();
+ inputBuffer.clear();
}
}
@@ -312,12 +330,7 @@
*/
protected Long2ObjectLinkedOpenHashMap<DocumentData> documentCache;
- /**
- * Document data objects that have been written to the zip file currently
- * being created and have to be kept in RAM until the file is closed and can
- * be open in read mode.
- */
- protected Long2ObjectLinkedOpenHashMap<DocumentData> inputBuffer;
+
/**
* Flag that gets set to true when the collection is closed (and blocks all
@@ -383,7 +396,7 @@
// prepare for writing
nextDocumentId = collectionFiles.isEmpty() ? 0 :
(collectionFiles.get(collectionFiles.size() - 1).lastEntry + 1);
- inputBuffer = new Long2ObjectLinkedOpenHashMap<DocumentData>();
+
}
@@ -401,23 +414,27 @@
if(collectionFiles.isEmpty() ||
documentID > collectionFiles.get(collectionFiles.size() - 1).lastEntry)
{
// it's a new document that's not yet available from the zip files
- documentData = inputBuffer.get(documentID);
+ documentData = collectionFileWriter != null ?
+ collectionFileWriter.inputBuffer.get(documentID):
+ null;
} else {
// it's an old document. Try the cache first
documentData = documentCache.getAndMoveToFirst(documentID);
if(documentData == null) {
// cache miss: we need to actually load it
//locate the right zip file
- files: for(CollectionFile aColFile : collectionFiles) {
- if(aColFile.containsDocument(documentID)) {
- // we found the collection file containing the document
- documentData = aColFile.getDocumentData(nextDocumentId);
- documentCache.putAndMoveToFirst(documentID, documentData);
- if(documentCache.size() > DOCUMENT_DATA_CACHE_SIZE) {
- documentCache.removeLast();
+ synchronized(collectionFiles) {
+ files: for(CollectionFile aColFile : collectionFiles) {
+ if(aColFile.containsDocument(documentID)) {
+ // we found the collection file containing the document
+ documentData = aColFile.getDocumentData(nextDocumentId);
+ documentCache.putAndMoveToFirst(documentID, documentData);
+ if(documentCache.size() > DOCUMENT_DATA_CACHE_SIZE) {
+ documentCache.removeLast();
+ }
+ break files;
}
- break files;
- }
+ }
}
}
}
@@ -440,15 +457,15 @@
try{
boolean success = false;
while(!success) {
- success = collectionFileWriter.writeDocumentData(
- Long.toString(nextDocumentId), document);
+ success = collectionFileWriter.writeDocumentData(nextDocumentId,
+ document);
if(!success) {
// the current collection file is full: close it
collectionFileWriter.close();
synchronized(collectionFiles) {
// open the newly saved zip file
collectionFiles.add(new
CollectionFile(collectionFileWriter.zipFile));
- inputBuffer.clear();
+
}
// open a new one and try again
openCollectionWriter();
@@ -457,8 +474,6 @@
} catch(IOException e){
throw new IndexException("Problem while accessing the collection file",
e);
} finally {
- // save the document data to the input buffer
- inputBuffer.put(nextDocumentId, document);
nextDocumentId++;
}
}
@@ -506,63 +521,118 @@
documentCache.clear();
}
-
- protected void compact() throws ZipException, IOException {
-
- ZipOutputStream outputStream = null;
- long outFileSize = 0;
- int outFileEntries = 0;
- for(File inputFile :
indexDirectory.listFiles(CollectionFile.FILENAME_FILTER)) {
- ZipFile inputZipFile = new ZipFile(inputFile);
- if(outputStream == null) {
- // we're not currently writing because all files so far have been OK
- if(inputZipFile.size() < ZIP_FILE_MAX_ENTRIES &&
- inputFile.length() < ZIP_FILE_MAX_SIZE) {
- // the current file is too small: we need to add more entries to it
-
- //this becomes the first out file
- // TODO
- //MKDIR out dir
- // mv file to out
- // open file for writing
- outFileEntries = inputZipFile.size();
- inputZipFile.close();
- outFileSize = inputFile.length();
- outputStream = new ZipOutputStream(new BufferedOutputStream(
- new FileOutputStream(inputFile)));
- }
- } else {
- // we're currently writing to an output file: we need to copy all the
- // entries in the new input file
- Enumeration<? extends ZipEntry> inputEntries = inputZipFile.entries();
- while(inputEntries.hasMoreElements()) {
- if(outFileEntries > ZIP_FILE_MAX_ENTRIES ||
- outFileSize > ZIP_FILE_MAX_SIZE) {
- // we need to move on to the next zip output file
- // TODO
+ /**
+ * Combines multiple smaller collection files into larger ones. If multiple
+ * consecutive collection files can be combined without exceeding the maximum
+ * permitted sizes ({@link #ZIP_FILE_MAX_ENTRIES} and
+ * {@link #ZIP_FILE_MAX_SIZE}), then they are combined.
+ *
+ * @throws ZipException
+ * @throws IOException
+ * @throws IndexException
+ */
+ public void compact() throws ZipException, IOException, IndexException {
+ // find an interval of files that can be joined together
+ // we search from the end toward the start so that we can modify the
+ // list without changing the yet-unvisited IDs.
+ CollectionFile[] colFilesArr = collectionFiles.toArray(
+ new CollectionFile[collectionFiles.size()]);
+ int intervalEnd = -1;
+ int intervalLength = 0;
+ int intervalEntries = 0;
+ long intervalBytes = 0;
+ for(int i = colFilesArr.length -1; i >= 0; i--) {
+ // is the current file small?
+ boolean smallFile =
+ colFilesArr[i].documentCount < ZIP_FILE_MAX_ENTRIES &&
+ colFilesArr[i].length < ZIP_FILE_MAX_SIZE;
+ if(intervalEnd < 0) { // we're looking for the first 'small' file
+ if(smallFile) {
+ // we found a small file: start a new interval
+ intervalEnd = i;
+ intervalLength = 1;
+ intervalEntries = colFilesArr[i].documentCount;
+ intervalBytes = colFilesArr[i].length;
+ }
+ } else { // we're trying to extend the current interval
+ boolean currentFileAccepted =
+ intervalEntries + colFilesArr[i].documentCount <
ZIP_FILE_MAX_ENTRIES &&
+ intervalBytes + colFilesArr[i].length < ZIP_FILE_MAX_SIZE;
+ if(currentFileAccepted) {
+ // extend the current interval
+ intervalEntries += colFilesArr[i].documentCount;
+ intervalBytes += colFilesArr[i].length;
+ intervalLength = intervalEnd - i + 1;
+ }
+ if(!currentFileAccepted || i == 0) {
+ // end the current interval
+ if(intervalLength > 1){
+ int intervalStart = intervalEnd - intervalLength + 1;
+ // combine the files
+ // create the new file
+ String newFileName = "temp-" +
+ CollectionFile.MIMIR_COLLECTION_BASENAME +
+ intervalStart + "-" + intervalEnd +
+ CollectionFile.MIMIR_COLLECTION_EXTENSION;
+ File newZipFile = new File(indexDirectory, newFileName);
+ ZipOutputStream zos = new ZipOutputStream(new
BufferedOutputStream(
+ new FileOutputStream(newZipFile)));
+ byte[] buff = new byte[1024 * 1024];
+ for(int j = intervalStart; j <= intervalEnd; j++) {
+ Enumeration<? extends ZipEntry> entries =
+ colFilesArr[j].zipFile.entries();
+ while(entries.hasMoreElements()) {
+ ZipEntry anEntry = entries.nextElement();
+ zos.putNextEntry(new ZipEntry(anEntry));
+ InputStream is =
colFilesArr[j].zipFile.getInputStream(anEntry);
+ int read = is.read(buff);
+ while(read >= 0){
+ zos.write(buff, 0, read);
+ read = is.read(buff);
+ }
+ zos.closeEntry();
+ }
+ }
+ zos.close();
+
+ //update the collection
+ synchronized(colFilesArr) {
+ //confirm that the collection files have not changed since we
started
+ for(int j = intervalStart; j <= intervalEnd; j++) {
+ if(colFilesArr[j] != collectionFiles.get(j)) {
+ logger.warn("Collection files have changed since the "
+ + "compacting operation started. Compact aborted.");
+ // delete the newly created collection file
+ newZipFile.delete();
+ return;
+ }
+ }
+ // replace the old files with the new one
+ File newCollectionFile = new File(indexDirectory,
+ CollectionFile.getCollectionFileName(
+ Integer.toString(intervalStart) + "-" +
+ Integer.toString(intervalEnd)));
+ newZipFile.renameTo(newCollectionFile);
+ for(int j = intervalStart; j <= intervalEnd; j++) {
+ CollectionFile oldColFile =
collectionFiles.remove(intervalStart);
+ if(!oldColFile.file.delete()) {
+ throw new IndexException(
+ "Could not delete old collection file " +
+ oldColFile.file + "! " +
+ "Document collection now inconsistent.");
+ }
+ }
+ collectionFiles.add(intervalStart, new
CollectionFile(newCollectionFile));
+ }
+ } else {
+ // we only found an interval of length 1: start again
+ intervalEnd = -1;
+ intervalLength = 0;
+ intervalEntries = 0;
+ intervalBytes = 0;
}
-
-
- ZipEntry inputEntry = inputEntries.nextElement();
- ZipEntry outputEntry = new ZipEntry(inputEntry);
- outputStream.putNextEntry(outputEntry);
- //write the data
- byte[] buf = new byte[1024 * 1024];
- InputStream is = inputZipFile.getInputStream(inputEntry);
- int read = is.read(buf);
- while(read >= 0) {
- outputStream.write(buf, 0, read);
- read = is.read(buf);
- }
- outputStream.closeEntry();
- outFileSize += outputEntry.getCompressedSize();
- outFileEntries++;
-
-
}
}
-
-
}
}
}
\ No newline at end of file
This was sent by the SourceForge.net collaborative development platform, the
world's largest Open Source development site.
------------------------------------------------------------------------------
WatchGuard Dimension instantly turns raw network data into actionable
security intelligence. It gives you real-time visual feedback on key
security issues and trends. Skip the complicated setup - simply import
a virtual appliance and go from zero to informed in seconds.
http://pubads.g.doubleclick.net/gampad/clk?id=123612991&iu=/4140/ostg.clktrk
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs