Hi Jake

Ok. The number of file handles left open is increasing rapidly. For instance, 4200 file handles were left open by Lucene 2.9.1 over a period of 16 min. You can see in the attached snapshot a picture from JPicus showing the file handles that are left open. These index files are deleted but the OS still holds references to them. Could it be that Lucene merge threads are not closing files correctly before they are deleted? More than likely, it is an error with our code, but where? Our LuceneIndex wrapper class is attached. If I set the max file OS count to a low figure, my application stops in its track, so this is definitely a critical issue that must be resolved.

Jamie


On 2010/01/27 10:24 AM, Jake Mannix wrote:
On Wed, Jan 27, 2010 at 12:17 AM, Jamie<ja...@stimulussoft.com>  wrote:
Oh!  Re-reading your initial post - you're just seeing lots of files which
haven't quite yet
been cleaned up during indexing, it looks like, yes?  There are threads
going on in the
background which are merging segments and deleting old files, these should
go away
over time.

Yes, but they do not. They just keep growing over time until the file handle count is exhausted.
I can see from the JPicus utility that although these
Do you see that they are still around after a very long period?  How high
does the file count grow?



package com.stimulus.archiva.index;
import com.stimulus.util.*;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import org.apache.commons.logging.*;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.*;
import org.apache.lucene.store.FSDirectory;
import com.stimulus.archiva.domain.Config;
import com.stimulus.archiva.domain.Indexer;
import com.stimulus.archiva.domain.Volume;
import com.stimulus.archiva.exception.*;
import com.stimulus.archiva.language.AnalyzerFactory;
import com.stimulus.archiva.search.*;
import java.util.*;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.AlreadyClosedException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.*;

public class LuceneIndex extends Thread {
        
                 protected ArrayBlockingQueue<LuceneDocument> queue;
                 protected static final Log logger = 
LogFactory.getLog(LuceneIndex.class.getName());
                 protected static final Log indexLog = 
LogFactory.getLog("indexlog");
                 IndexWriter writer = null;
                 protected static ScheduledExecutorService scheduler;
                 protected static ScheduledFuture<?> scheduledTask;
                 protected LuceneDocument EXIT_REQ = null;
                 ReentrantLock indexLock = new ReentrantLock();
                 ArchivaAnalyzer analyzer       = new ArchivaAnalyzer();
                 File indexLogFile;
                 PrintStream indexLogOut;
                 IndexProcessor indexProcessor;
                 String friendlyName;
                 String indexPath;
                 int maxSimultaneousDocs;
                 int indexThreads;
                 
                  public LuceneIndex(int queueSize, LuceneDocument exitReq, 
String friendlyName, String indexPath, int  maxSimultaneousDocs, int 
indexThreads) {
                          this.queue = new 
ArrayBlockingQueue<LuceneDocument>(queueSize);
                          this.EXIT_REQ = exitReq;
                          this.friendlyName = friendlyName;
                          this.indexPath = indexPath;
                          this.maxSimultaneousDocs = maxSimultaneousDocs;
                          this.indexThreads = indexThreads;
                          setLog(friendlyName);
                  }
                  
                  
                public int getMaxSimultaneousDocs() {
                        return maxSimultaneousDocs;
                }
                
                public void setMaxSimultaneousDocs(int maxSimultaneousDocs) {
                        this.maxSimultaneousDocs = maxSimultaneousDocs;
                }
                  
                  
                public ReentrantLock getIndexLock() {
                        return indexLock;
                }
          
                protected void setLog(String logName) {

                          try {
                                  indexLogFile = getIndexLogFile(logName);
                                  if (indexLogFile!=null) {
                                          if (indexLogFile.length()>10485760)
                                                  indexLogFile.delete();
                                          indexLogOut = new 
PrintStream(indexLogFile);
                                  }
                                  logger.debug("set index log file path 
{path='"+indexLogFile.getCanonicalPath()+"'}");
                          } catch (Exception e) {
                                  logger.error("failed to open index log 
file:"+e.getMessage(),e);
                          }
                }
                
                protected File getIndexLogFile(String logName) {
                         try {
                                  String logfilepath = 
Config.getFileSystem().getLogPath()+File.separator+logName+"index.log";
                                  return new File(logfilepath);
                          } catch (Exception e) {
                                  logger.error("failed to open index log 
file:"+e.getMessage(),e);
                                  return null;
                          }
                }
                
                
                
                  protected void openIndex() throws MessageSearchException {
                        Exception lastError = null;
                        
                if (writer==null) {
                        logger.debug("openIndex() index "+friendlyName+" will 
be opened. it is currently closed.");
                } else {
                        logger.debug("openIndex() did not bother opening index 
"+friendlyName+". it is already open.");
                        return; 
                }
                logger.debug("opening index "+friendlyName+" for write");
                        logger.debug("opening search index "+friendlyName+" for 
write {indexpath='"+indexPath+"'}");
                        boolean writelock;
                        int attempt = 0;
                        int maxattempt = 10;
                        
                        if 
(Config.getConfig().getIndex().getMultipleIndexProcesses()) {
                                maxattempt = 10000;
                        } else {
                                maxattempt = 10;
                        }
                        
                        do {
                                writelock = false;
                                try {
                                                FSDirectory fsDirectory = 
FSDirectory.open(new File(indexPath));
                                                int maxIndexChars = 
Config.getConfig().getIndex().getMaxIndexPerFieldChars();
                                                writer = new 
IndexWriter(fsDirectory,analyzer,new IndexWriter.MaxFieldLength(maxIndexChars));
                                                if (indexLog.isDebugEnabled() 
&& indexLogOut!=null) {
                                                        
writer.setInfoStream(indexLogOut);
                                                }
                                } catch (LockObtainFailedException lobfe) {
                                                logger.debug("write lock on 
index "+friendlyName+". will reopen in 50ms.");
                                                try { Thread.sleep(50); } catch 
(Exception e) {}
                                                attempt++;
                                                writelock = true;
                                } catch (CorruptIndexException cie) {
                                        throw new MessageSearchException("index 
"+friendlyName+" appears to be corrupt. please reindex the active 
volume."+cie.getMessage(),logger);
                                } catch (Throwable io) {
                                        throw new 
MessageSearchException("failed to write document to index 
"+friendlyName+":"+io.getMessage(),logger);
                                }
                   } while (writelock && attempt<maxattempt);
                   if (attempt>=10000)
                         throw new MessageSearchException("failed to open index 
"+friendlyName+" writer {indexPath='"+indexPath+"'}",lastError,logger);
                }
                
                public void indexDocument(LuceneDocument luceneDocument) throws 
MessageSearchException {
                        logger.debug("index document {"+luceneDocument+"}");
                        long s = (new Date()).getTime();
                        if (luceneDocument == null)
                            throw new MessageSearchException("assertion 
failure: null document",logger);
                        try {
                                queue.put(luceneDocument);
                        } catch (InterruptedException ie) {
                                throw new MessageSearchException("failed to add 
document to queue:"+ie.getMessage(),ie,logger);
                        }
                        logger.debug("document indexed successfully 
{"+luceneDocument+"}");
                        
                        logger.debug("indexing message end 
{"+luceneDocument+"}");
                        long e = (new Date()).getTime();
                    logger.debug("indexing time {time='"+(e-s)+"'}");
                }
                
                public class DocWriter implements Runnable {
                        
                        LuceneDocument doc;
                        String language;
                        LinkedList<LuceneDocument> pushbacks;
                        
                        public DocWriter(LuceneDocument doc,String 
language,LinkedList<LuceneDocument> pushbacks) {
                                this.doc = doc;
                                this.language = language;
                                this.pushbacks = pushbacks;
                        }
                        
                        public void run() {
                                try {
                                        
writer.addDocument(doc.getDocument(),AnalyzerFactory.getAnalyzer(language,AnalyzerFactory.Operation.INDEX));
                                } catch (IOException io) {
                                        logger.error("failed to add document to 
index:"+io.getMessage(),io);
                                } catch (AlreadyClosedException e) {
                                        pushbacks.add(doc);
                                        
                        }
                        }
                        
                }
                
        
        public class IndexProcessor extends Thread {
                
                public IndexProcessor() {
                        setName("index processor");
                }
      
                        public void run() {
                                boolean exit = false;
                                LuceneDocument luceneDocument = null;   
                                LinkedList<LuceneDocument> pushbacks = new 
LinkedList<LuceneDocument>();
                            
                                while (!exit) {
                                        
                                        try {
                                                //documentPool = 
Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads());
                                                luceneDocument = null;  
                                                luceneDocument = 
(LuceneDocument) queue.take();
                                                
                                                indexLock.lock();
                                                
                                                if (luceneDocument==EXIT_REQ) {
                                                        logger.debug("index 
exit req received. exiting");
                                                        exit = true;
                                                        continue;
                                                }
                                        
                                                if (luceneDocument==null) {
                                                        logger.debug("index 
info is null");
                                                }
                                                int i = 0;
                                                ExecutorService threadPool = 
Executors.newFixedThreadPool(indexThreads,ThreadUtil.getFlexibleThreadFactory("indexwritepool"));
                                                
                                                while(luceneDocument!=null && 
i<maxSimultaneousDocs) {
                                                
                                                                Document doc = 
luceneDocument.getDocument();
                                                                String language 
= doc.get("lang");
                                                                if 
(language==null) {
                                                                        
language = Config.getConfig().getIndex().getIndexLanguage();
                                                                }
                                                                DocWriter 
docWriter = new DocWriter(luceneDocument,language,pushbacks);
                                                                
threadPool.submit(docWriter);
                                                                
                                                         i++;
                                                         if 
(i<maxSimultaneousDocs) {
                                                                 luceneDocument 
= (LuceneDocument) queue.poll(); 
                                                                 
                                                                 if 
(luceneDocument==null) {
                                                                                
        logger.debug("index info is null");
                                                                         }
                                                                 
                                                                 if 
(luceneDocument==EXIT_REQ) {
                                                                                
logger.debug("index exit req received. exiting (2)");
                                                                                
        exit = true;
                                                                                
        break;
                                                                          }
                                                         }      
                                                          
                                                }
                                                threadPool.shutdown();
                                                
threadPool.awaitTermination(30,TimeUnit.MINUTES);
                                                if (pushbacks.size()>0) {
                                                         
                                                          for (LuceneDocument 
pushback : pushbacks) {
                                                                        try {
                                                                                
writer.addDocument(pushback.getDocument());
                                                                        } catch 
(IOException io) {
                                                                                
logger.error("failed to add document to index:"+io.getMessage(),io);
                                                                        } catch 
(AlreadyClosedException e) {
                                                                                
pushbacks.add(pushback);
                                                                } 
                                                                        i++;
                                                          }
                                                }
                                                try {
                                                        writer.commit(); 
                                                } catch (Exception e) {
                                                        logger.error("failed to 
commit index:"+e.getMessage(),e);
                                                        closeIndex();
                                                        openIndex();
                                                }
                                                
                                        } catch (Throwable ie) {
                                                logger.error("index write 
interrupted:"+ie.getMessage(),ie);
                                        } finally {
                                                
                                                  indexLock.unlock();
                                        }
                                }       
                        }
                        
                        public class IndexDocument extends Thread {
                                
                                LuceneDocument luceneDocument = null;
                                List<LuceneDocument> pushbacks = null;
                                
                                        public IndexDocument(LuceneDocument 
luceneDocument,List<LuceneDocument> pushbacks) {
                                                this.luceneDocument = 
luceneDocument;
                                                this.pushbacks = pushbacks;
                                                setName("index document");
                                        }
                                
                                        public void run() {
                                                try {
                                                        
writer.addDocument(luceneDocument.getDocument());
                                                } catch (IOException io) {
                                                        logger.error("failed to 
add document to index:"+io.getMessage(),io);
                                                } catch (AlreadyClosedException 
e) {
                                                        
pushbacks.add(luceneDocument);
                                        } catch (Throwable t) {
                                                logger.error("failed to add 
document to index:"+t.getMessage(),t);
                                        }
                            }};
                        }
      
                protected void closeIndex() {
                        try {
                                indexLock.lock();
                                if (writer!=null) {
                                writer.close();
                                }
                         } catch (Throwable io) {
                                logger.error("failed to close index 
writer:"+io.getMessage(),io);
                         } finally {
                                 logger.debug("writer closed");
                                 writer = null;
                                 indexLock.unlock();
                         }
                }
        
                
                
                public void optimize() throws MessageSearchException {
                          logger.debug("optimize volume");
                          try {
                                  indexLock.lock();
                                  try {
                                          writer.optimize(false);
                                  } catch (Exception io) {
                                          throw new 
MessageSearchException("failed to optimize the 
index:"+io.getMessage(),io,logger);
                                  }
                          } catch (Throwable t) { // diskspace problems could 
arise
                                  logger.error("failed to optimize 
index:"+t.getMessage(),t);
                          } finally {
                                  indexLock.unlock();
                          }
                        
                }
                public void deleteDocs(Term[] terms) throws 
MessageSearchException {
                          logger.debug("delete docs");
                          try {
                                  indexLock.lock();
                                  openIndex();
                                  try {
                                          writer.deleteDocuments(terms);
                                  } catch (Exception e) {
                                          throw new 
MessageSearchException("failed to delete doc from 
index:"+e.getMessage(),e,logger);
                                  } finally {
                                          try {
                                                  writer.commit();
                                                  writer.expungeDeletes(false);
                                          } catch (Exception io) {
                                                  throw new 
MessageSearchException("failed to expunge docs from 
index:"+io.getMessage(),io,logger);
                                          }
                                  }
                          } catch (Throwable t) {
                                  logger.error("failed to delete docs from 
index."+t.getMessage(),t);       
                      } finally {
                                  indexLock.unlock();
                          }
                }
                
                
                  public void deleteIndex() throws MessageSearchException {
                                 logger.debug("delete index 
{indexpath='"+indexPath+"'}");
                                 try {
                                         indexLock.lock();
                                         closeIndex();
                                         try {
                                                int maxIndexChars = 
Config.getConfig().getIndex().getMaxIndexPerFieldChars();
                                                writer = new 
IndexWriter(FSDirectory.open(new File(indexPath)),analyzer,true,new 
IndexWriter.MaxFieldLength(maxIndexChars));
                                         } catch (Throwable cie) {
                                                 logger.error("failed to delete 
index {index='"+indexPath+"'}",cie);
                                                 return;
                                         } 
                                } finally {
                                          openIndex();
                                          indexLock.unlock();
                                }
                  }
                
                  public void startup() throws MessageSearchException {
                        logger.debug("luceneindex is starting up");
                        File lockFile = new File(indexPath+File.separatorChar + 
"write.lock");
                        if (lockFile.exists()) {
                                if 
(Config.getConfig().getIndex().getMultipleIndexProcesses()) {
                                        logger.debug("index lock file detected 
on volumeindex startup.");
                                } else {
                                        logger.warn("index lock file detected. 
the server was shutdown incorrectly. automatically deleting lock file.");
                                        lockFile.delete();
                                }
                        }
                        openIndex();
                        indexProcessor = new IndexProcessor();
                        indexProcessor.start();
                        Runtime.getRuntime().addShutdownHook(this);
                        
                  }
                  
                  public IndexReader getReader() throws IOException {
                          return writer.getReader();
                  }
                  
                  public void shutdown() {
                          //logger.debug("volumeindex is shutting down");
                          for (int i=0;i<indexThreads;i++) {
                                  queue.add(EXIT_REQ);
                          }
                          closeIndex();
                  }
                  
                  @Override
                  public void run() {
                          shutdown();
                  }
                  
                  
                  public interface LuceneDocument {
                          
                          public String toString();
                          public Document getDocument();
                          public void finalize();
                          
                  }
         
}

        


---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscr...@lucene.apache.org
For additional commands, e-mail: java-user-h...@lucene.apache.org

Reply via email to