My application also meet this problem last year and I researched on the code 
and found the reason. 
The whole process is as follow:
1. When using NRTCachingDirectory, it will use RAMDirectory as cache and 
MMapDirectory as delegate. The new segment will be created in the process of 
flush or merge. And the NRTCachingDirectory use the parameters of 
maxMergeSizeBytes and maxCachedBytes to decide to create the new segment in 
cache(in memory) or in delegate(in disk).
2.  When flush to create new segment,   it will compare the 
context.fllushinfo.estimatedSegmentSize of new segment with the above 
parameter. If the size of new segment is small, then it will be created in 
RAMDirectory, otherwise in MMapDirectory.
3. When merge to create new segment, it will compare the 
context.mergeInfo.estimatedMergeBytes of  new segment with the above parameter. 
And if the size of new segment is small, it will be created in cache, otherwise 
in delegate.
4.  But when the new segment is compound index file(cfs) no matter during flush 
or merge, it will use IOContext.DEFAULT for that segment, and the 
estimatedMergeBytes ,estimatedSegmentSize  are both null for IOContext.DEFAULT, 
resulting in creating the new compund segment file always in cache no matter 
how big it really is. This is the core issue. 
 
Then I will explain the mechanism of releasing the segment in cache.
1.  Normally, in the process of commit, the sync operation will flush the new 
created segment files to the disk, and delete them from the cache. But if the 
merging process is running during the sync, so the new created segment by merge 
will not be sync to disk in this commit, and the new merged compound segment 
file will be created in cache as described above.
2.  If using NRT feature, the IndexSearcher will get segmentReader from the 
IndexWriter by getReader method. And theire is a ReaderPool inside the 
IndexWriter. For the new segment, it will first fetch from the cache of 
NRTCachingDirectory, if the new segment is not in the cache(created directly in 
the disk or commit to disk releasing from the cache), then fetch it from the 
delegate. The new fetched segment will be put in the ReaderPool in the 
IndexWriter. As described above, the new segment created by merge is in the 
cache now, and when it is fetched by IndexWriter, it will be referenced by the 
ReaderPool of IndexWriter. In the process of next commit, this new segment will 
be sync to disk and released from the cache, but it is still referenced by the 
ReaderPool. And you will see the IndexSearcher reference a lot of RAMFile which 
are already in the disk. When these RAMFil can be dropped?  When these segments 
join the new merging process to create new segment, then these old segments 
will be released from the ReaderPool of the IndexWriter completely.
 
I modified the lucene souce code to solve this problem in the 
CompoundFileWriter class.
out = new DirectCFSIndexOutput(getOutput(), entry, false);  //original
out = new DirectCFSIndexOutput(getOutput(context), entry, false); //modified

IndexOutput createOutput(String name, IOContext context) throws IOException {  
ensureOpen();  boolean success = false;  boolean outputLocked = false;  try {   
assert name != null : "name must not be null";   if (entries.containsKey(name)) 
{    throw new IllegalArgumentException("File " + name + " already exists");   
}   final FileEntry entry = new FileEntry();   entry.file = name;   
entries.put(name, entry);   final String id = 
IndexFileNames.stripSegmentName(name);   assert !seenIDs.contains(id) : 
"file=\"" + name + "\" maps to id=\"" + id + "\", which was already written";   
seenIDs.add(id);   final DirectCFSIndexOutput out;
   if ((outputLocked = outputTaken.compareAndSet(false, true))) {    //out = 
new DirectCFSIndexOutput(getOutput(), entry, false);     out = new 
DirectCFSIndexOutput(getOutput(context), entry, false);   } else {    entry.dir 
= this.directory;    if (directory.fileExists(name)) {     throw new 
IllegalArgumentException("File " + name + " already exists");    }    out = new 
DirectCFSIndexOutput(directory.createOutput(name, context), entry, true);   }   
success = true;   return out;  } finally {   if (!success) {    
entries.remove(name);    if (outputLocked) { // release the output lock if not 
successful     assert outputTaken.get();     releaseOutputLock();    }   }  } }
 private synchronized IndexOutput getOutput(IOContext context) throws 
IOException {  if (dataOut == null) {   boolean success = false;   try {    
dataOut = directory.createOutput(dataFileName, context);    
CodecUtil.writeHeader(dataOut, DATA_CODEC, VERSION_CURRENT);    success = true; 
  } finally {    if (!success) {     
IOUtils.closeWhileHandlingException(dataOut);    }   }  }  return dataOut; }
 

Reply via email to