Hi Michael

Thanks for your help. Here are the stacks:

index processor [TIME_WAITING] CPU time: 33:01
java.lang.Object.wait(long)
org.apache.lucene.index.IndexWriter.doWait()
org.apache.lucene.index.IndexWriter.shouldClose()
org.apache.lucene.index.IndexWriter.close(boolean)
org.apache.lucene.index.IndexWriter.close()
com.stimulus.archiva.index.VolumeIndex.closeIndex()
com.stimulus.archiva.index.VolumeIndex$IndexProcessor.run()

The source code to our indexer is attached. As you can see, documents are added to a blocking queue. The index processor thread takes it out of the queue and processes it. After about 60k documents IndexWriter's close method enters TIME_WAITING indefinitely. It there any workaround to this problem?


package com.stimulus.archiva.index;

import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import javax.mail.MessagingException;
import org.apache.commons.logging.*;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.*;
import org.apache.lucene.store.FSDirectory;
import com.stimulus.archiva.domain.Config;
import com.stimulus.archiva.domain.Email;
import com.stimulus.archiva.domain.EmailID;
import com.stimulus.archiva.domain.Indexer;
import com.stimulus.archiva.domain.Volume;
import com.stimulus.archiva.exception.*;
import com.stimulus.archiva.language.AnalyzerFactory;
import com.stimulus.archiva.search.*;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.AlreadyClosedException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.*;

public class VolumeIndex extends Thread {
protected ArrayBlockingQueue<IndexInfo> queue; protected static final Log logger = LogFactory.getLog(VolumeIndex.class.getName());
           IndexWriter writer = null;
           Volume volume;
           protected static ScheduledExecutorService scheduler;
        protected static ScheduledFuture<?> scheduledTask;
        protected static IndexInfo EXIT_REQ = new IndexInfo(null);
        ReentrantLock indexLock = new ReentrantLock();
        ArchivaAnalyzer analyzer     = new ArchivaAnalyzer();
        Indexer indexer = null;
        File indexLogFile;
        PrintStream indexLogOut;
        IndexProcessor indexProcessor;
public VolumeIndex(Indexer indexer, Volume volume) {
               logger.debug("creating new volume index {"+volume+"}");
               this.volume = volume;
               this.indexer = indexer;
this.queue = new ArrayBlockingQueue<IndexInfo>(Config.getConfig().getIndex().getIndexBacklog()); try {
                   indexLogFile = getIndexLogFile(volume);
                   if (indexLogFile!=null) {
                       if (indexLogFile.length()>10485760)
                           indexLogFile.delete();
                       indexLogOut = new PrintStream(indexLogFile);
                   }
logger.debug("set index log file path {path='"+indexLogFile.getCanonicalPath()+"'}");
               } catch (Exception e) {
logger.error("failed to open index log file:"+e.getMessage(),e);
               }
               startup();
           }
protected File getIndexLogFile(Volume volume) {
              try {
                   String indexpath = volume.getIndexPath();
                   int lio = indexpath.lastIndexOf(File.separator)+1;
String logfilepath = indexpath.substring(lio,indexpath.length()-1);
                   logfilepath += ".log";
                   logfilepath = "index_"+logfilepath;
logfilepath = Config.getFileSystem().getLogPath()+File.separator+logfilepath;
                   return new File(logfilepath);
               } catch (Exception e) {
logger.error("failed to open index log file:"+e.getMessage(),e);
                   return null;
               }
         }
public void deleteMessages(List<String> ids) throws MessageSearchException {
             if (ids == null)
throw new MessageSearchException("assertion failure: null ids",logger); Term[] terms = new Term[ids.size()];
             int c = 0;
             StringBuffer deleteInfo = new StringBuffer();
             for (String id : ids) {
                 terms[c++] = new Term("uid",id);
                 deleteInfo.append(id);
                 deleteInfo.append(",");
             }
String deleteStr = deleteInfo.toString(); if (deleteStr.length()>0 && deleteStr.charAt(deleteStr.length()-1)==',')
                 deleteStr = deleteStr.substring(0,deleteStr.length()-1);
logger.debug("delete messages {'"+deleteInfo+"'}");
             try {
                 indexLock.lock();
                 openIndex();
                 try {
                     writer.deleteDocuments(terms);
                     writer.expungeDeletes();
                 } catch (Exception e) {
throw new MessageSearchException("failed to delete email from index.",e,logger);
                 } finally {
}
             } finally {
                 closeIndex();
                 indexLock.unlock();
             }
       }
protected void openIndex() throws MessageSearchException {
            Exception lastError = null;
if (writer==null) { logger.debug("openIndex() index will be opened. it is currently closed.");
           } else {
logger.debug("openIndex() did not bother opening index. it is already open.");
               return;
           }
           logger.debug("opening index for write {"+volume+"}");
           indexer.prepareIndex(volume);
logger.debug("opening search index for write {indexpath='"+volume.getIndexPath()+"'}");
           boolean writelock;
           int attempt = 0;
           int maxattempt = 10;
if (Config.getConfig().getIndex().getMultipleIndexProcesses()) {
               maxattempt = 10000;
            } else {
               maxattempt = 10;
            }
do {
               writelock = false;
               try {
FSDirectory fsDirectory = FSDirectory.getDirectory(volume.getIndexPath()); int maxIndexChars = Config.getConfig().getIndex().getMaxIndexPerFieldChars(); writer = new IndexWriter(fsDirectory,analyzer,new IndexWriter.MaxFieldLength(maxIndexChars));
                       if (logger.isDebugEnabled() && indexLogOut!=null) {
                           writer.setInfoStream(indexLogOut);
                       }
               } catch (LockObtainFailedException lobfe) {
logger.debug("write lock on index. will reopen in 50ms.");
                       try { Thread.sleep(50); } catch (Exception e) {}
                       attempt++;
                       writelock = true;
               } catch (CorruptIndexException cie) {
throw new MessageSearchException("index appears to be corrupt. please reindex the active volume."+cie.getMessage(),logger);
               } catch (IOException io) {
throw new MessageSearchException("failed to write document to index:"+io.getMessage(),logger);
               }
          } while (writelock && attempt<maxattempt);
          if (attempt>=10000)
throw new MessageSearchException("failed to open index writer {location='"+volume.getIndexPath()+"'}",lastError,logger);
       }
public void indexMessage(Email message) throws MessageSearchException {
           logger.debug("index message {"+message+"}");
           long s = (new Date()).getTime();
           if (message == null)
throw new MessageSearchException("assertion failure: null message",logger);
           Document doc = new Document();
           IndexInfo indexInfo = new IndexInfo(doc);
           try {
              DocumentIndex docIndex = new DocumentIndex(indexer);
              String language = doc.get("lang");
              if (language==null)
                  language = indexer.getIndexLanguage();
              docIndex.write(message,doc,indexInfo);
              queue.put(indexInfo);
logger.debug("message indexed successfully {"+message+",language='"+language+"'}");
           } catch (MessagingException me) {
throw new MessageSearchException("failed to decode message during indexing",me,logger, ChainedException.Level.DEBUG);
           } catch (IOException me) {
throw new MessageSearchException("failed to index message"+me.getMessage()+" {"+message+"}",me,logger, ChainedException.Level.DEBUG);
           } catch (ExtractionException ee)
           {
               // we will want to continue indexing
//throw new MessageSearchException("failed to decode attachments in message {"+message+"}",ee,logger, ChainedException.Level.DEBUG);
           } catch (AlreadyClosedException ace) {
               indexMessage(message);
           } catch (Throwable e) {
throw new MessageSearchException("failed to index message:"+e.getMessage(),e,logger, ChainedException.Level.DEBUG);
           }
           logger.debug("indexing message end {"+message+"}");
           long e = (new Date()).getTime();
           logger.debug("indexing time {time='"+(e-s)+"'}");
       }
public class IndexProcessor extends Thread { public IndexProcessor() {
               setName("index processor");
           }
public void run() {
               boolean exit = false;
               //ExecutorService documentPool;
// we abandoned pool as it does not seem to offer any major performance benefit IndexInfo indexInfo = null; LinkedList<IndexInfo> pushbacks = new LinkedList<IndexInfo>(); while (!exit) { try { int maxIndexDocs = Config.getConfig().getIndex().getMaxSimultaneousDocs(); //documentPool = Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads()); indexInfo = null; indexInfo = (IndexInfo) queue.take();
                       if (indexInfo==EXIT_REQ) {
logger.debug("index exit req received. exiting");
                           exit = true;
                           continue;
                       }
indexLock.lock();
                       try {
                            openIndex();
                       } catch (Exception e) {
logger.error("failed to open index:"+e.getMessage(),e);
                            return;
                       }
                       if (indexInfo==null) {
                           logger.debug("index info is null");
                       }
                       int i = 0;
                       while(indexInfo!=null && i<maxIndexDocs) {
                           try {
                               writer.addDocument(indexInfo.getDocument());
                           } catch (IOException io) {
logger.error("failed to add document to index:"+io.getMessage(),io);
                           } catch (AlreadyClosedException e) {
                               pushbacks.add(indexInfo);
                           } finally {
                               indexInfo.cleanup();
                           }
//documentPool.execute(new IndexDocument(indexInfo,pushbacks)); i++; if (i<maxIndexDocs) {
                                indexInfo = (IndexInfo) queue.poll();
if (indexInfo==null) {
                                       logger.debug("index info is null");
                                }
if (indexInfo==EXIT_REQ) { logger.debug("index exit req received. exiting (2)");
                                       exit = true;
                                       break;
                                 }
                            }
} for (IndexInfo pushback : pushbacks) {
                           try {
                               writer.addDocument(pushback.getDocument());
                           } catch (IOException io) {
logger.error("failed to add document to index:"+io.getMessage(),io);
                           } catch (AlreadyClosedException e) {
                               pushbacks.add(indexInfo);
                           } finally {
                               indexInfo.cleanup();
                           }
//documentPool.execute(new IndexDocument(pushback,pushbacks));
                           i++;
                       }
                       //documentPool.shutdown();
//documentPool.awaitTermination(30,TimeUnit.MINUTES); } catch (Throwable ie) { logger.error("index write interrupted:"+ie.getMessage());
                    } finally {
                          closeIndex();
                         indexLock.unlock();
                   }
} } public class IndexDocument extends Thread { IndexInfo indexInfo = null;
                   List<IndexInfo> pushbacks = null;
public IndexDocument(IndexInfo indexInfo,List<IndexInfo> pushbacks) {
                       this.indexInfo = indexInfo;
                       this.pushbacks = pushbacks;
                       setName("index document");
                   }
public void run() {
                       try {
                           writer.addDocument(indexInfo.getDocument());
                       } catch (IOException io) {
logger.error("failed to add document to index:"+io.getMessage(),io);
                       } catch (AlreadyClosedException e) {
                           pushbacks.add(indexInfo);
                       }
                   }};
           }
protected void closeIndex() {
            try {
                    if (writer!=null) {
                       writer.close();
                       logger.debug("writer closed");
                       writer = null;
                    }
                } catch (Exception io) {
logger.error("failed to close index writer:"+io.getMessage(),io);
                }
       }
public void deleteIndex() throws MessageSearchException { logger.debug("delete index {indexpath='"+volume.getIndexPath()+"'}");
                  try {
                     indexLock.lock();
                    try {
int maxIndexChars = Config.getConfig().getIndex().getMaxIndexPerFieldChars(); writer = new IndexWriter(FSDirectory.getDirectory(volume.getIndexPath()),analyzer,true,new IndexWriter.MaxFieldLength(maxIndexChars));
                    } catch (Exception cie) {
logger.error("failed to delete index {index='"+volume.getIndexPath()+"'}",cie);
                        return;
                    }
                    MessageIndex.volumeIndexes.remove(this);
                 } finally {
                       closeIndex();
                     indexLock.unlock();
               }
         }
public void startup() {
           logger.debug("volumeindex is starting up");
File lockFile = new File(volume.getIndexPath()+File.separatorChar + "write.lock");
           if (lockFile.exists()) {
logger.warn("The server lock file already exists. Either another indexer is running or the server was not shutdown correctly."); logger.warn("If it is the latter, the lock file must be manually deleted at "+lockFile.getAbsolutePath());
               if (indexer.getMultipleIndexProcesses()) {
logger.debug("index lock file detected on volumeindex startup.");
               } else {
logger.warn("index lock file detected. the server was shutdown incorrectly. automatically deleting lock file."); logger.warn("indexer is configured to deal with only one indexer process."); logger.warn("if you are running more than one indexer, your index could be subject to corruption.");
                   lockFile.delete();
               }
           }
           indexProcessor = new IndexProcessor();
           indexProcessor.start();
           Runtime.getRuntime().addShutdownHook(this);
} public void shutdown() {
             logger.debug("volumeindex is shutting down");
             queue.add(EXIT_REQ);
             scheduler.shutdownNow();
} @Override
         public void run() {
             queue.add(EXIT_REQ);
         }
}



Is it possible a large merge is running?  By default IW.close waits
for outstanding merges to complete.  Can you post the stacktrace?

Mike

On Thu, Oct 8, 2009 at 5:22 PM, Jamie Band <ja...@stimulussoft.com> wrote:
Hi All

I have a long running situation where our indexing thread is getting stuck
indefinitely in IndexWriter's close method. Yourkit shows the thread to be
stuck in TIME_WAITING. Any idea's on what could be causing this?
Could it be one of the streams or readers we passed to the document?

I am running Lucene 2.9.0.

Many thanks in advance

Jamie



---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscr...@lucene.apache.org
For additional commands, e-mail: java-user-h...@lucene.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscr...@lucene.apache.org
For additional commands, e-mail: java-user-h...@lucene.apache.org




---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscr...@lucene.apache.org
For additional commands, e-mail: java-user-h...@lucene.apache.org

Reply via email to