Hi Michael
Thanks for your help. Here are the stacks:
index processor [TIME_WAITING] CPU time: 33:01
java.lang.Object.wait(long)
org.apache.lucene.index.IndexWriter.doWait()
org.apache.lucene.index.IndexWriter.shouldClose()
org.apache.lucene.index.IndexWriter.close(boolean)
org.apache.lucene.index.IndexWriter.close()
com.stimulus.archiva.index.VolumeIndex.closeIndex()
com.stimulus.archiva.index.VolumeIndex$IndexProcessor.run()
The source code to our indexer is attached. As you can see, documents
are added to a blocking queue. The index processor thread takes it out
of the queue and processes it. After about 60k documents IndexWriter's
close method enters TIME_WAITING indefinitely. It there any workaround
to this problem?
package com.stimulus.archiva.index;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import javax.mail.MessagingException;
import org.apache.commons.logging.*;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.*;
import org.apache.lucene.store.FSDirectory;
import com.stimulus.archiva.domain.Config;
import com.stimulus.archiva.domain.Email;
import com.stimulus.archiva.domain.EmailID;
import com.stimulus.archiva.domain.Indexer;
import com.stimulus.archiva.domain.Volume;
import com.stimulus.archiva.exception.*;
import com.stimulus.archiva.language.AnalyzerFactory;
import com.stimulus.archiva.search.*;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.AlreadyClosedException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.*;
public class VolumeIndex extends Thread {
protected ArrayBlockingQueue<IndexInfo> queue;
protected static final Log logger =
LogFactory.getLog(VolumeIndex.class.getName());
IndexWriter writer = null;
Volume volume;
protected static ScheduledExecutorService scheduler;
protected static ScheduledFuture<?> scheduledTask;
protected static IndexInfo EXIT_REQ = new IndexInfo(null);
ReentrantLock indexLock = new ReentrantLock();
ArchivaAnalyzer analyzer = new ArchivaAnalyzer();
Indexer indexer = null;
File indexLogFile;
PrintStream indexLogOut;
IndexProcessor indexProcessor;
public VolumeIndex(Indexer indexer, Volume volume) {
logger.debug("creating new volume index {"+volume+"}");
this.volume = volume;
this.indexer = indexer;
this.queue = new
ArrayBlockingQueue<IndexInfo>(Config.getConfig().getIndex().getIndexBacklog());
try {
indexLogFile = getIndexLogFile(volume);
if (indexLogFile!=null) {
if (indexLogFile.length()>10485760)
indexLogFile.delete();
indexLogOut = new PrintStream(indexLogFile);
}
logger.debug("set index log file path
{path='"+indexLogFile.getCanonicalPath()+"'}");
} catch (Exception e) {
logger.error("failed to open index log
file:"+e.getMessage(),e);
}
startup();
}
protected File getIndexLogFile(Volume volume) {
try {
String indexpath = volume.getIndexPath();
int lio = indexpath.lastIndexOf(File.separator)+1;
String logfilepath =
indexpath.substring(lio,indexpath.length()-1);
logfilepath += ".log";
logfilepath = "index_"+logfilepath;
logfilepath =
Config.getFileSystem().getLogPath()+File.separator+logfilepath;
return new File(logfilepath);
} catch (Exception e) {
logger.error("failed to open index log
file:"+e.getMessage(),e);
return null;
}
}
public void deleteMessages(List<String> ids) throws
MessageSearchException {
if (ids == null)
throw new MessageSearchException("assertion failure:
null ids",logger);
Term[] terms = new Term[ids.size()];
int c = 0;
StringBuffer deleteInfo = new StringBuffer();
for (String id : ids) {
terms[c++] = new Term("uid",id);
deleteInfo.append(id);
deleteInfo.append(",");
}
String deleteStr = deleteInfo.toString();
if (deleteStr.length()>0 &&
deleteStr.charAt(deleteStr.length()-1)==',')
deleteStr = deleteStr.substring(0,deleteStr.length()-1);
logger.debug("delete messages {'"+deleteInfo+"'}");
try {
indexLock.lock();
openIndex();
try {
writer.deleteDocuments(terms);
writer.expungeDeletes();
} catch (Exception e) {
throw new MessageSearchException("failed to delete
email from index.",e,logger);
} finally {
}
} finally {
closeIndex();
indexLock.unlock();
}
}
protected void openIndex() throws MessageSearchException {
Exception lastError = null;
if (writer==null) {
logger.debug("openIndex() index will be opened. it is
currently closed.");
} else {
logger.debug("openIndex() did not bother opening index.
it is already open.");
return;
}
logger.debug("opening index for write {"+volume+"}");
indexer.prepareIndex(volume);
logger.debug("opening search index for write
{indexpath='"+volume.getIndexPath()+"'}");
boolean writelock;
int attempt = 0;
int maxattempt = 10;
if (Config.getConfig().getIndex().getMultipleIndexProcesses()) {
maxattempt = 10000;
} else {
maxattempt = 10;
}
do {
writelock = false;
try {
FSDirectory fsDirectory =
FSDirectory.getDirectory(volume.getIndexPath());
int maxIndexChars =
Config.getConfig().getIndex().getMaxIndexPerFieldChars();
writer = new
IndexWriter(fsDirectory,analyzer,new
IndexWriter.MaxFieldLength(maxIndexChars));
if (logger.isDebugEnabled() && indexLogOut!=null) {
writer.setInfoStream(indexLogOut);
}
} catch (LockObtainFailedException lobfe) {
logger.debug("write lock on index. will reopen
in 50ms.");
try { Thread.sleep(50); } catch (Exception e) {}
attempt++;
writelock = true;
} catch (CorruptIndexException cie) {
throw new MessageSearchException("index appears to
be corrupt. please reindex the active volume."+cie.getMessage(),logger);
} catch (IOException io) {
throw new MessageSearchException("failed to write
document to index:"+io.getMessage(),logger);
}
} while (writelock && attempt<maxattempt);
if (attempt>=10000)
throw new MessageSearchException("failed to open index
writer {location='"+volume.getIndexPath()+"'}",lastError,logger);
}
public void indexMessage(Email message) throws
MessageSearchException {
logger.debug("index message {"+message+"}");
long s = (new Date()).getTime();
if (message == null)
throw new MessageSearchException("assertion failure:
null message",logger);
Document doc = new Document();
IndexInfo indexInfo = new IndexInfo(doc);
try {
DocumentIndex docIndex = new DocumentIndex(indexer);
String language = doc.get("lang");
if (language==null)
language = indexer.getIndexLanguage();
docIndex.write(message,doc,indexInfo);
queue.put(indexInfo);
logger.debug("message indexed successfully
{"+message+",language='"+language+"'}");
} catch (MessagingException me) {
throw new MessageSearchException("failed to decode
message during indexing",me,logger, ChainedException.Level.DEBUG);
} catch (IOException me) {
throw new MessageSearchException("failed to index
message"+me.getMessage()+" {"+message+"}",me,logger,
ChainedException.Level.DEBUG);
} catch (ExtractionException ee)
{
// we will want to continue indexing
//throw new MessageSearchException("failed to decode
attachments in message {"+message+"}",ee,logger,
ChainedException.Level.DEBUG);
} catch (AlreadyClosedException ace) {
indexMessage(message);
} catch (Throwable e) {
throw new MessageSearchException("failed to index
message:"+e.getMessage(),e,logger, ChainedException.Level.DEBUG);
}
logger.debug("indexing message end {"+message+"}");
long e = (new Date()).getTime();
logger.debug("indexing time {time='"+(e-s)+"'}");
}
public class IndexProcessor extends Thread {
public IndexProcessor() {
setName("index processor");
}
public void run() {
boolean exit = false;
//ExecutorService documentPool;
// we abandoned pool as it does not seem to offer any
major performance benefit
IndexInfo indexInfo = null;
LinkedList<IndexInfo> pushbacks = new
LinkedList<IndexInfo>();
while (!exit) {
try {
int maxIndexDocs =
Config.getConfig().getIndex().getMaxSimultaneousDocs();
//documentPool =
Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads());
indexInfo = null;
indexInfo = (IndexInfo) queue.take();
if (indexInfo==EXIT_REQ) {
logger.debug("index exit req received.
exiting");
exit = true;
continue;
}
indexLock.lock();
try {
openIndex();
} catch (Exception e) {
logger.error("failed to open
index:"+e.getMessage(),e);
return;
}
if (indexInfo==null) {
logger.debug("index info is null");
}
int i = 0;
while(indexInfo!=null && i<maxIndexDocs) {
try {
writer.addDocument(indexInfo.getDocument());
} catch (IOException io) {
logger.error("failed to add document to
index:"+io.getMessage(),io);
} catch (AlreadyClosedException e) {
pushbacks.add(indexInfo);
} finally {
indexInfo.cleanup();
}
//documentPool.execute(new
IndexDocument(indexInfo,pushbacks));
i++;
if (i<maxIndexDocs) {
indexInfo = (IndexInfo) queue.poll();
if (indexInfo==null) {
logger.debug("index info is null");
}
if (indexInfo==EXIT_REQ) {
logger.debug("index exit req
received. exiting (2)");
exit = true;
break;
}
}
}
for (IndexInfo pushback : pushbacks) {
try {
writer.addDocument(pushback.getDocument());
} catch (IOException io) {
logger.error("failed to add document to
index:"+io.getMessage(),io);
} catch (AlreadyClosedException e) {
pushbacks.add(indexInfo);
} finally {
indexInfo.cleanup();
}
//documentPool.execute(new
IndexDocument(pushback,pushbacks));
i++;
}
//documentPool.shutdown();
//documentPool.awaitTermination(30,TimeUnit.MINUTES);
} catch (Throwable ie) {
logger.error("index write
interrupted:"+ie.getMessage());
} finally {
closeIndex();
indexLock.unlock();
}
}
}
public class IndexDocument extends Thread {
IndexInfo indexInfo = null;
List<IndexInfo> pushbacks = null;
public IndexDocument(IndexInfo
indexInfo,List<IndexInfo> pushbacks) {
this.indexInfo = indexInfo;
this.pushbacks = pushbacks;
setName("index document");
}
public void run() {
try {
writer.addDocument(indexInfo.getDocument());
} catch (IOException io) {
logger.error("failed to add document to
index:"+io.getMessage(),io);
} catch (AlreadyClosedException e) {
pushbacks.add(indexInfo);
}
}};
}
protected void closeIndex() {
try {
if (writer!=null) {
writer.close();
logger.debug("writer closed");
writer = null;
}
} catch (Exception io) {
logger.error("failed to close index
writer:"+io.getMessage(),io);
}
}
public void deleteIndex() throws MessageSearchException {
logger.debug("delete index
{indexpath='"+volume.getIndexPath()+"'}");
try {
indexLock.lock();
try {
int maxIndexChars =
Config.getConfig().getIndex().getMaxIndexPerFieldChars();
writer = new
IndexWriter(FSDirectory.getDirectory(volume.getIndexPath()),analyzer,true,new
IndexWriter.MaxFieldLength(maxIndexChars));
} catch (Exception cie) {
logger.error("failed to delete index
{index='"+volume.getIndexPath()+"'}",cie);
return;
}
MessageIndex.volumeIndexes.remove(this);
} finally {
closeIndex();
indexLock.unlock();
}
}
public void startup() {
logger.debug("volumeindex is starting up");
File lockFile = new
File(volume.getIndexPath()+File.separatorChar + "write.lock");
if (lockFile.exists()) {
logger.warn("The server lock file already exists. Either
another indexer is running or the server was not shutdown correctly.");
logger.warn("If it is the latter, the lock file must be
manually deleted at "+lockFile.getAbsolutePath());
if (indexer.getMultipleIndexProcesses()) {
logger.debug("index lock file detected on
volumeindex startup.");
} else {
logger.warn("index lock file detected. the server
was shutdown incorrectly. automatically deleting lock file.");
logger.warn("indexer is configured to deal with only
one indexer process.");
logger.warn("if you are running more than one
indexer, your index could be subject to corruption.");
lockFile.delete();
}
}
indexProcessor = new IndexProcessor();
indexProcessor.start();
Runtime.getRuntime().addShutdownHook(this);
}
public void shutdown() {
logger.debug("volumeindex is shutting down");
queue.add(EXIT_REQ);
scheduler.shutdownNow();
}
@Override
public void run() {
queue.add(EXIT_REQ);
}
}
Is it possible a large merge is running? By default IW.close waits
for outstanding merges to complete. Can you post the stacktrace?
Mike
On Thu, Oct 8, 2009 at 5:22 PM, Jamie Band <ja...@stimulussoft.com> wrote:
Hi All
I have a long running situation where our indexing thread is getting stuck
indefinitely in IndexWriter's close method. Yourkit shows the thread to be
stuck in TIME_WAITING. Any idea's on what could be causing this?
Could it be one of the streams or readers we passed to the document?
I am running Lucene 2.9.0.
Many thanks in advance
Jamie
---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscr...@lucene.apache.org
For additional commands, e-mail: java-user-h...@lucene.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscr...@lucene.apache.org
For additional commands, e-mail: java-user-h...@lucene.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscr...@lucene.apache.org
For additional commands, e-mail: java-user-h...@lucene.apache.org