Hi Jake
Ok. The number of file handles left open is increasing rapidly. For
instance, 4200 file handles were left open by Lucene 2.9.1 over a period
of 16 min. You can see in the attached snapshot a picture from JPicus
showing the file handles that are left open. These index files are
deleted but the OS still holds references to them. Could it be that
Lucene merge threads are not closing files correctly before they are
deleted? More than likely, it is an error with our code, but where? Our
LuceneIndex wrapper class is attached. If I set the max file OS count to
a low figure, my application stops in its track, so this is definitely a
critical issue that must be resolved.
Jamie
On 2010/01/27 10:24 AM, Jake Mannix wrote:
On Wed, Jan 27, 2010 at 12:17 AM, Jamie<ja...@stimulussoft.com> wrote:
Oh! Re-reading your initial post - you're just seeing lots of files which
haven't quite yet
been cleaned up during indexing, it looks like, yes? There are threads
going on in the
background which are merging segments and deleting old files, these should
go away
over time.
Yes, but they do not. They just keep growing over time until the file
handle count is exhausted.
I can see from the JPicus utility that although these
Do you see that they are still around after a very long period? How high
does the file count grow?
package com.stimulus.archiva.index;
import com.stimulus.util.*;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import org.apache.commons.logging.*;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.*;
import org.apache.lucene.store.FSDirectory;
import com.stimulus.archiva.domain.Config;
import com.stimulus.archiva.domain.Indexer;
import com.stimulus.archiva.domain.Volume;
import com.stimulus.archiva.exception.*;
import com.stimulus.archiva.language.AnalyzerFactory;
import com.stimulus.archiva.search.*;
import java.util.*;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.AlreadyClosedException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.*;
public class LuceneIndex extends Thread {
protected ArrayBlockingQueue<LuceneDocument> queue;
protected static final Log logger =
LogFactory.getLog(LuceneIndex.class.getName());
protected static final Log indexLog =
LogFactory.getLog("indexlog");
IndexWriter writer = null;
protected static ScheduledExecutorService scheduler;
protected static ScheduledFuture<?> scheduledTask;
protected LuceneDocument EXIT_REQ = null;
ReentrantLock indexLock = new ReentrantLock();
ArchivaAnalyzer analyzer = new ArchivaAnalyzer();
File indexLogFile;
PrintStream indexLogOut;
IndexProcessor indexProcessor;
String friendlyName;
String indexPath;
int maxSimultaneousDocs;
int indexThreads;
public LuceneIndex(int queueSize, LuceneDocument exitReq,
String friendlyName, String indexPath, int maxSimultaneousDocs, int
indexThreads) {
this.queue = new
ArrayBlockingQueue<LuceneDocument>(queueSize);
this.EXIT_REQ = exitReq;
this.friendlyName = friendlyName;
this.indexPath = indexPath;
this.maxSimultaneousDocs = maxSimultaneousDocs;
this.indexThreads = indexThreads;
setLog(friendlyName);
}
public int getMaxSimultaneousDocs() {
return maxSimultaneousDocs;
}
public void setMaxSimultaneousDocs(int maxSimultaneousDocs) {
this.maxSimultaneousDocs = maxSimultaneousDocs;
}
public ReentrantLock getIndexLock() {
return indexLock;
}
protected void setLog(String logName) {
try {
indexLogFile = getIndexLogFile(logName);
if (indexLogFile!=null) {
if (indexLogFile.length()>10485760)
indexLogFile.delete();
indexLogOut = new
PrintStream(indexLogFile);
}
logger.debug("set index log file path
{path='"+indexLogFile.getCanonicalPath()+"'}");
} catch (Exception e) {
logger.error("failed to open index log
file:"+e.getMessage(),e);
}
}
protected File getIndexLogFile(String logName) {
try {
String logfilepath =
Config.getFileSystem().getLogPath()+File.separator+logName+"index.log";
return new File(logfilepath);
} catch (Exception e) {
logger.error("failed to open index log
file:"+e.getMessage(),e);
return null;
}
}
protected void openIndex() throws MessageSearchException {
Exception lastError = null;
if (writer==null) {
logger.debug("openIndex() index "+friendlyName+" will
be opened. it is currently closed.");
} else {
logger.debug("openIndex() did not bother opening index
"+friendlyName+". it is already open.");
return;
}
logger.debug("opening index "+friendlyName+" for write");
logger.debug("opening search index "+friendlyName+" for
write {indexpath='"+indexPath+"'}");
boolean writelock;
int attempt = 0;
int maxattempt = 10;
if
(Config.getConfig().getIndex().getMultipleIndexProcesses()) {
maxattempt = 10000;
} else {
maxattempt = 10;
}
do {
writelock = false;
try {
FSDirectory fsDirectory =
FSDirectory.open(new File(indexPath));
int maxIndexChars =
Config.getConfig().getIndex().getMaxIndexPerFieldChars();
writer = new
IndexWriter(fsDirectory,analyzer,new IndexWriter.MaxFieldLength(maxIndexChars));
if (indexLog.isDebugEnabled()
&& indexLogOut!=null) {
writer.setInfoStream(indexLogOut);
}
} catch (LockObtainFailedException lobfe) {
logger.debug("write lock on
index "+friendlyName+". will reopen in 50ms.");
try { Thread.sleep(50); } catch
(Exception e) {}
attempt++;
writelock = true;
} catch (CorruptIndexException cie) {
throw new MessageSearchException("index
"+friendlyName+" appears to be corrupt. please reindex the active
volume."+cie.getMessage(),logger);
} catch (Throwable io) {
throw new
MessageSearchException("failed to write document to index
"+friendlyName+":"+io.getMessage(),logger);
}
} while (writelock && attempt<maxattempt);
if (attempt>=10000)
throw new MessageSearchException("failed to open index
"+friendlyName+" writer {indexPath='"+indexPath+"'}",lastError,logger);
}
public void indexDocument(LuceneDocument luceneDocument) throws
MessageSearchException {
logger.debug("index document {"+luceneDocument+"}");
long s = (new Date()).getTime();
if (luceneDocument == null)
throw new MessageSearchException("assertion
failure: null document",logger);
try {
queue.put(luceneDocument);
} catch (InterruptedException ie) {
throw new MessageSearchException("failed to add
document to queue:"+ie.getMessage(),ie,logger);
}
logger.debug("document indexed successfully
{"+luceneDocument+"}");
logger.debug("indexing message end
{"+luceneDocument+"}");
long e = (new Date()).getTime();
logger.debug("indexing time {time='"+(e-s)+"'}");
}
public class DocWriter implements Runnable {
LuceneDocument doc;
String language;
LinkedList<LuceneDocument> pushbacks;
public DocWriter(LuceneDocument doc,String
language,LinkedList<LuceneDocument> pushbacks) {
this.doc = doc;
this.language = language;
this.pushbacks = pushbacks;
}
public void run() {
try {
writer.addDocument(doc.getDocument(),AnalyzerFactory.getAnalyzer(language,AnalyzerFactory.Operation.INDEX));
} catch (IOException io) {
logger.error("failed to add document to
index:"+io.getMessage(),io);
} catch (AlreadyClosedException e) {
pushbacks.add(doc);
}
}
}
public class IndexProcessor extends Thread {
public IndexProcessor() {
setName("index processor");
}
public void run() {
boolean exit = false;
LuceneDocument luceneDocument = null;
LinkedList<LuceneDocument> pushbacks = new
LinkedList<LuceneDocument>();
while (!exit) {
try {
//documentPool =
Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads());
luceneDocument = null;
luceneDocument =
(LuceneDocument) queue.take();
indexLock.lock();
if (luceneDocument==EXIT_REQ) {
logger.debug("index
exit req received. exiting");
exit = true;
continue;
}
if (luceneDocument==null) {
logger.debug("index
info is null");
}
int i = 0;
ExecutorService threadPool =
Executors.newFixedThreadPool(indexThreads,ThreadUtil.getFlexibleThreadFactory("indexwritepool"));
while(luceneDocument!=null &&
i<maxSimultaneousDocs) {
Document doc =
luceneDocument.getDocument();
String language
= doc.get("lang");
if
(language==null) {
language = Config.getConfig().getIndex().getIndexLanguage();
}
DocWriter
docWriter = new DocWriter(luceneDocument,language,pushbacks);
threadPool.submit(docWriter);
i++;
if
(i<maxSimultaneousDocs) {
luceneDocument
= (LuceneDocument) queue.poll();
if
(luceneDocument==null) {
logger.debug("index info is null");
}
if
(luceneDocument==EXIT_REQ) {
logger.debug("index exit req received. exiting (2)");
exit = true;
break;
}
}
}
threadPool.shutdown();
threadPool.awaitTermination(30,TimeUnit.MINUTES);
if (pushbacks.size()>0) {
for (LuceneDocument
pushback : pushbacks) {
try {
writer.addDocument(pushback.getDocument());
} catch
(IOException io) {
logger.error("failed to add document to index:"+io.getMessage(),io);
} catch
(AlreadyClosedException e) {
pushbacks.add(pushback);
}
i++;
}
}
try {
writer.commit();
} catch (Exception e) {
logger.error("failed to
commit index:"+e.getMessage(),e);
closeIndex();
openIndex();
}
} catch (Throwable ie) {
logger.error("index write
interrupted:"+ie.getMessage(),ie);
} finally {
indexLock.unlock();
}
}
}
public class IndexDocument extends Thread {
LuceneDocument luceneDocument = null;
List<LuceneDocument> pushbacks = null;
public IndexDocument(LuceneDocument
luceneDocument,List<LuceneDocument> pushbacks) {
this.luceneDocument =
luceneDocument;
this.pushbacks = pushbacks;
setName("index document");
}
public void run() {
try {
writer.addDocument(luceneDocument.getDocument());
} catch (IOException io) {
logger.error("failed to
add document to index:"+io.getMessage(),io);
} catch (AlreadyClosedException
e) {
pushbacks.add(luceneDocument);
} catch (Throwable t) {
logger.error("failed to add
document to index:"+t.getMessage(),t);
}
}};
}
protected void closeIndex() {
try {
indexLock.lock();
if (writer!=null) {
writer.close();
}
} catch (Throwable io) {
logger.error("failed to close index
writer:"+io.getMessage(),io);
} finally {
logger.debug("writer closed");
writer = null;
indexLock.unlock();
}
}
public void optimize() throws MessageSearchException {
logger.debug("optimize volume");
try {
indexLock.lock();
try {
writer.optimize(false);
} catch (Exception io) {
throw new
MessageSearchException("failed to optimize the
index:"+io.getMessage(),io,logger);
}
} catch (Throwable t) { // diskspace problems could
arise
logger.error("failed to optimize
index:"+t.getMessage(),t);
} finally {
indexLock.unlock();
}
}
public void deleteDocs(Term[] terms) throws
MessageSearchException {
logger.debug("delete docs");
try {
indexLock.lock();
openIndex();
try {
writer.deleteDocuments(terms);
} catch (Exception e) {
throw new
MessageSearchException("failed to delete doc from
index:"+e.getMessage(),e,logger);
} finally {
try {
writer.commit();
writer.expungeDeletes(false);
} catch (Exception io) {
throw new
MessageSearchException("failed to expunge docs from
index:"+io.getMessage(),io,logger);
}
}
} catch (Throwable t) {
logger.error("failed to delete docs from
index."+t.getMessage(),t);
} finally {
indexLock.unlock();
}
}
public void deleteIndex() throws MessageSearchException {
logger.debug("delete index
{indexpath='"+indexPath+"'}");
try {
indexLock.lock();
closeIndex();
try {
int maxIndexChars =
Config.getConfig().getIndex().getMaxIndexPerFieldChars();
writer = new
IndexWriter(FSDirectory.open(new File(indexPath)),analyzer,true,new
IndexWriter.MaxFieldLength(maxIndexChars));
} catch (Throwable cie) {
logger.error("failed to delete
index {index='"+indexPath+"'}",cie);
return;
}
} finally {
openIndex();
indexLock.unlock();
}
}
public void startup() throws MessageSearchException {
logger.debug("luceneindex is starting up");
File lockFile = new File(indexPath+File.separatorChar +
"write.lock");
if (lockFile.exists()) {
if
(Config.getConfig().getIndex().getMultipleIndexProcesses()) {
logger.debug("index lock file detected
on volumeindex startup.");
} else {
logger.warn("index lock file detected.
the server was shutdown incorrectly. automatically deleting lock file.");
lockFile.delete();
}
}
openIndex();
indexProcessor = new IndexProcessor();
indexProcessor.start();
Runtime.getRuntime().addShutdownHook(this);
}
public IndexReader getReader() throws IOException {
return writer.getReader();
}
public void shutdown() {
//logger.debug("volumeindex is shutting down");
for (int i=0;i<indexThreads;i++) {
queue.add(EXIT_REQ);
}
closeIndex();
}
@Override
public void run() {
shutdown();
}
public interface LuceneDocument {
public String toString();
public Document getDocument();
public void finalize();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscr...@lucene.apache.org
For additional commands, e-mail: java-user-h...@lucene.apache.org