knguyen 2005/05/23 21:35:16 CEST
Modified files: (Branch: JAHIA-4-1-BRANCH)
src/java/org/jahia/services/search JahiaSearchBaseService.java
ScheduledSiteIndexationJob.java
SiteIndexationJobDetail.java
Log:
- optimization :
keep lucene segment merging as low for faster indexation.
more batches in inserting and removing document.
- unfortunatelly job unscheduling service doesn't really stop the job, so we
have to add our own check.
Revision Changes Path
1.42.2.15.2.9 +302 -166
jahia/src/java/org/jahia/services/search/JahiaSearchBaseService.java
http://jahia.mine.nu:8080/cgi-bin/cvsweb.cgi/jahia/src/java/org/jahia/services/search/JahiaSearchBaseService.java.diff?r1=1.42.2.15.2.8&r2=1.42.2.15.2.9&f=h
1.1.2.4 +7 -1
jahia/src/java/org/jahia/services/search/ScheduledSiteIndexationJob.java
http://jahia.mine.nu:8080/cgi-bin/cvsweb.cgi/jahia/src/java/org/jahia/services/search/ScheduledSiteIndexationJob.java.diff?r1=1.1.2.3&r2=1.1.2.4&f=h
1.1.2.3 +10 -0
jahia/src/java/org/jahia/services/search/SiteIndexationJobDetail.java
http://jahia.mine.nu:8080/cgi-bin/cvsweb.cgi/jahia/src/java/org/jahia/services/search/SiteIndexationJobDetail.java.diff?r1=1.1.2.2&r2=1.1.2.3&f=h
Index: JahiaSearchBaseService.java
===================================================================
RCS file:
/home/cvs/repository/jahia/src/java/org/jahia/services/search/Attic/JahiaSearchBaseService.java,v
retrieving revision 1.42.2.15.2.8
retrieving revision 1.42.2.15.2.9
diff -u -r1.42.2.15.2.8 -r1.42.2.15.2.9
--- JahiaSearchBaseService.java 23 May 2005 09:11:35 -0000
1.42.2.15.2.8
+++ JahiaSearchBaseService.java 23 May 2005 19:35:15 -0000
1.42.2.15.2.9
@@ -103,11 +103,10 @@
private BeanFactory fileExtractorsFactory = null;
- private HashMap writers = new HashMap();
- private HashMap readers = new HashMap();
- private HashMap ramDirs = new HashMap();
private Cache fullSiteIndexationJobsCache;
+ private HashMap sitesLastOptimizedTime = new HashMap();
+
/**
* Constructor
* Client should always call getInstance() method
@@ -281,6 +280,8 @@
.getInitParameterNames();
String name = "";
String value = "";
+ int mergeFactor = -1;
+ int minMergeDocs = -1;
while ( enum.hasMoreElements() ){
name = (String)enum.nextElement();
if ( name.startsWith("org.apache.lucene") ){
@@ -288,9 +289,27 @@
.getInitParameter(name);
if ( value != null && !"".equals(value.trim()) ){
System.setProperty(name, value);
+ if ("org.apache.lucene.mergeFactor".equals(name) ){
+ try {
+ mergeFactor = Integer.parseInt(value);
+ } catch ( Throwable t ){
+ }
+ }
+ if ("org.apache.lucene.minMergeDocs".equals(name) ){
+ try {
+ minMergeDocs = Integer.parseInt(value);
+ } catch ( Throwable t ){
+ }
+ }
}
}
}
+ if ( mergeFactor == -1 ){
+ System.setProperty("org.apache.lucene.mergeFactor",
String.valueOf(50));
+ }
+ if ( minMergeDocs == -1 ){
+ System.setProperty("org.apache.lucene.minMergeDocs",
String.valueOf(1000));
+ }
}
public synchronized void shutdown() throws JahiaException {
@@ -756,10 +775,11 @@
public boolean abortSiteIndexation (int siteID) {
String jobName = "siteScheduledIndexation_" + String.valueOf(siteID)
+ "_Job";
String triggerName = "siteScheduledIndexation_" +
String.valueOf(siteID) + "_Trigger";
- JobDetail job = (JobDetail)fullSiteIndexationJobsCache.get(jobName);
+ SiteIndexationJobDetail job =
(SiteIndexationJobDetail)fullSiteIndexationJobsCache.get(jobName);
boolean result = true;
if ( job != null ) {
try {
+ job.setKilled(true);
ServicesRegistry.getInstance().getSchedulerService().unscheduleJob(triggerName,
Scheduler.DEFAULT_GROUP);
} catch (JahiaException je) {
@@ -926,20 +946,18 @@
*/
public void run () {
- IndexWriter writer = null;
- IndexReader reader = null;
- RAMDirectory ramDir = null;
+ SiteIndexer siteIndexer = null;
Integer siteId = null;
- ParamBean jParams = null;
+ HashMap siteIndexers = new HashMap();
while (indexingThreadActivated) {
Vector v = new Vector();
- Vector lowPriorityQueue = new Vector();
HashMap toBeAdded = new HashMap();
HashMap toBeRemoved = new HashMap();
JahiaIndexableDocument doc = null;
+
synchronized (this) {
// 1.Separate docs that are going to be added or removed
@@ -980,76 +998,72 @@
long indexingStartTime = System.currentTimeMillis();
int indexOrderCount = v.size ();
- int docCount = 0;
while (v.size() > 0) {
JahiaIndexableDocument nextObject = null;
-
if (v.size() != 0) {
nextObject = (JahiaIndexableDocument)
v.elementAt(0);
v.remove(0);
}
- if ( docCount>400 ){ // limit before moving to File system
- closeCachedWriter(siteId);
- docCount = 0;
- }
-
// okay now we have the next added/removed field, we process
it!
if (nextObject != null) {
siteId = new Integer(nextObject.getSiteId());
- if (nextObject.toBeAdded()) {
- // close cached reader first
- closeCachedReader(siteId);
-
- writer = (IndexWriter)writers.get(siteId);
- if ( writer == null ){
- ramDir = new RAMDirectory();
- try {
- writer = new IndexWriter(ramDir,
this.analyzer,
- true);
- // to optimize indexation, we must set use
compound file to false
- writer.setUseCompoundFile(false);
- writer.minMergeDocs = 1000;
- writer.mergeFactor = 50;
- //writer.maxMergeDocs = 10000;
- writers.put(siteId,writer);
- ramDirs.put(siteId,ramDir);
- } catch ( Throwable t ){
- logger.debug(t);
- }
- docCount = 0;
- }
- if ( writer != null ){
- backgroundAddObjectToSearchEngine(nextObject,
writer, null);
- docCount++;
+ siteIndexer = (SiteIndexer)siteIndexers.get(siteId);
+ try {
+ if ( siteIndexer == null ){
+ siteIndexer = new
SiteIndexer(siteId.intValue(),1000);
+ siteIndexers.put(siteId,siteIndexer);
}
+ } catch ( Throwable t ){
+ logger.debug("Error occured indexing in background",
t);
}
- else {
- // close cached writer first
- closeCachedWriter(siteId);
- docCount = 0;
-
backgroundRemoveObjectFromSearchEngine(nextObject,reader);
+ if ( siteIndexer != null ) {
+ try {
+ siteIndexer.addDocument(nextObject);
+ } catch ( Throwable t ){
+ logger.debug("Error addind document to
SiteIndexer siteId="
+ + siteId.intValue(), t);
+ }
}
this.indexOrdersCache.remove(getCacheKey(nextObject));
}
}
- Iterator iterator = readers.keySet().iterator();
- while ( iterator.hasNext() ){
- siteId = (Integer)iterator.next();
- closeCachedReader(siteId);
- }
- iterator = writers.keySet().iterator();
- while ( iterator.hasNext() ){
- siteId = (Integer)iterator.next();
- closeCachedWriter(siteId);
- }
- this.writers = new HashMap();
- this.readers = new HashMap();
- this.ramDirs = new HashMap();
+ long now = System.currentTimeMillis();
+ Iterator iterator = siteIndexers.values().iterator();
+ while (iterator.hasNext()) {
+ siteIndexer = (SiteIndexer) iterator.next();
+ try {
+ siteIndexer.storeInPersistance();
+ int pendingDocSize = 0;
+ synchronized(this) {
+ pendingDocSize = indexOrders.size();
+ }
+ if ( pendingDocSize == 0 ){
+ Long lastOptimizedTime =
+ (Long)this.sitesLastOptimizedTime
+ .get(new Integer(siteIndexer.getSiteId()));
+ if ( lastOptimizedTime == null ){
+ lastOptimizedTime = new Long(System.
+ currentTimeMillis());
+ this.sitesLastOptimizedTime.put(
+ new
Integer(siteIndexer.getSiteId()),lastOptimizedTime);
+ }
+ if ( (now - lastOptimizedTime.longValue()) > 300000
){
+ this.optimizeIndex(siteIndexer.getSiteId());
+ this.sitesLastOptimizedTime.put(
+ new Integer(siteIndexer.getSiteId()),new
Long(now));
+ }
+ }
+ } catch (Throwable t) {
+ logger.debug(
+ "Error calling storeInPersistance on siteIndexer",
+ t);
+ }
+ }
long indexingElapsedTime = System.currentTimeMillis() -
indexingStartTime;
if (logger.isInfoEnabled()) {
@@ -1085,74 +1099,12 @@
//
//**************************************************************************
- /**
- * close cached writer in writers map.
- *
- * @param siteId Integer, the siteId
- */
- private void closeCachedWriter(Integer siteId){
- if ( siteId == null ){
- return;
- }
- IndexWriter writer = (IndexWriter)writers.get(siteId);
- RAMDirectory ramDir = (RAMDirectory)ramDirs.get(siteId);
- if ( ramDir != null ){
- //move InMemory indexes to FS
- ramDir.close();
- IndexWriter fsWriter = null;
- try {
- fsWriter = this.getIndexWriter(siteId.intValue(),
- this.analyzer, false);
- } catch ( Throwable t ){
- logger.debug(t);
- }
- if ( fsWriter == null ){
- try {
- fsWriter = this.getIndexWriter(siteId.intValue(),
- this.analyzer, true);
- } catch ( Throwable t ){
- logger.debug(t);
- }
- }
- if ( fsWriter != null ){
- try {
- fsWriter.addIndexes(new Directory[] {ramDir});
- } catch ( Throwable t ){
- logger.debug(t);
- } finally {
- this.closeIndexWriter(fsWriter);
- }
- }
- } else {
- this.closeIndexWriter(writer);
- }
- writers.remove(siteId);
- ramDirs.remove(siteId);
- ramDir = null;
- writer = null;
- }
-
- /**
- * close cached reader in readers map.
- *
- * @param siteId Integer, the siteId
- */
- private void closeCachedReader(Integer siteId){
- if ( siteId == null ){
- return;
- }
- IndexReader reader = (IndexReader)readers.get(siteId);
- this.closeIndexReader(reader);
- readers.remove(siteId);
- reader = null;
- }
-
//--------------------------------------------------------------------------
/**
* index a JahiaIndexable object
*
* @param indObj
- * @param writer, if null, a new writer will be opened.
+ * @param writer, if null, a new ramWriter will be opened.
*/
private void index (JahiaIndexableDocument indObj, IndexWriter writer,
IndexReader reader) {
@@ -1161,43 +1113,10 @@
long startTime = System.currentTimeMillis();
- Document doc = new Document ();
- Locale locale = Locale.getDefault();
- String[] langCodes = (String[])indObj.getFields()
- .get(JahiaSearchConstant.FIELD_LANGUAGE_CODE);
- String langCode = null;
- if ( langCodes != null && langCodes.length>0 ){
- langCode = langCodes[0];
- }
- if ( langCodes != null &&
!ContentObject.SHARED_LANGUAGE.equals(langCode) ){
- locale =
org.jahia.utils.LanguageCodeConverters.languageCodeToLocale(langCode);
- } else {
- locale = null;
- }
- Hashtable attributes = indObj.getFields ();
- if (attributes != null && attributes.size () > 0) {
- Enumeration keys = attributes.keys ();
- String[] vals = null;
- int count = 0;
- while (keys.hasMoreElements ()) {
- String key = (String) keys.nextElement ();
- vals = (String[]) attributes.get (key);
- count = vals.length;
- for (int i = 0; i < count; i++) {
- String val = (String) vals[i];
- if ( locale != null && val != null ){
- val = val.toLowerCase(locale);
- }
- if (!indObj.isFieldUnStored (key)) {
- doc.add (Field.Keyword (key, val));
- } else {
- doc.add (Field.UnStored (key, val));
- }
- }
- }
+ Document doc = this.getLuceneDocument(indObj);
+ if ( doc == null ){
+ return;
}
- doc.add (Field.Keyword (indObj.getKeyFieldName (),
- indObj.getKey ()));
// first remove previous entry
Term term = new Term (indObj.getKeyFieldName (), indObj.getKey ());
@@ -1226,7 +1145,7 @@
this.analyzer, true);
if (writer == null) {
- logger.warn ("The index writer is null, abort indexing
the object");
+ logger.warn ("The index ramWriter is null, abort
indexing the object");
return;
}
closeWriter = true;
@@ -1253,14 +1172,67 @@
}
}
+
+
//--------------------------------------------------------------------------
+ /**
+ * Return a ready for indexation lucene Document
+ *
+ * @param indObj
+ */
+ protected Document getLuceneDocument (JahiaIndexableDocument indObj)
+ {
+
+ if (indObj == null)
+ return null;
+
+ Document doc = new Document ();
+ Locale locale = Locale.getDefault();
+ String[] langCodes = (String[])indObj.getFields()
+ .get(JahiaSearchConstant.FIELD_LANGUAGE_CODE);
+ String langCode = null;
+ if ( langCodes != null && langCodes.length>0 ){
+ langCode = langCodes[0];
+ }
+ if ( langCodes != null &&
!ContentObject.SHARED_LANGUAGE.equals(langCode) ){
+ locale =
org.jahia.utils.LanguageCodeConverters.languageCodeToLocale(langCode);
+ } else {
+ locale = null;
+ }
+ Hashtable attributes = indObj.getFields ();
+ if (attributes != null && attributes.size () > 0) {
+ Enumeration keys = attributes.keys ();
+ String[] vals = null;
+ int count = 0;
+ while (keys.hasMoreElements ()) {
+ String key = (String) keys.nextElement ();
+ vals = (String[]) attributes.get (key);
+ count = vals.length;
+ for (int i = 0; i < count; i++) {
+ String val = (String) vals[i];
+ if ( locale != null && val != null ){
+ val = val.toLowerCase(locale);
+ }
+ if (!indObj.isFieldUnStored (key)) {
+ doc.add (Field.Keyword (key, val));
+ } else {
+ doc.add (Field.UnStored (key, val));
+ }
+ }
+ }
+ }
+ doc.add (Field.Keyword (indObj.getKeyFieldName (),
+ indObj.getKey ()));
+ return doc;
+ }
+
//--------------------------------------------------------------------------
/**
* Indexes a field with a given IndexWriter
- * Don't forget to close the index writer to flush change to the index
file!
+ * Don't forget to close the index ramWriter to flush change to the
index file!
*
* @param JahiaField aField, the field to index.
* @param workflowState
- * @param IndexWriter writer, the index writer to use.
+ * @param IndexWriter ramWriter, the index ramWriter to use.
*/
private void indexField (JahiaField aField, int workflowState,
IndexWriter writer, IndexReader reader)
@@ -1375,13 +1347,13 @@
//--------------------------------------------------------------------------
/**
* Returns the IndexWriter for a given site.
- * Don't forget to close the returned index writer to flush change to
the index file !
+ * Don't forget to close the returned index ramWriter to flush change to
the index file !
*
* @param int siteID, the site id.
* @param Analyzer the analyzer to use.
* @param boolean if true, create a new index and replace existing one.
*
- * @return IndexWriter writer, the IndexWriter, null on error.
+ * @return IndexWriter ramWriter, the IndexWriter, null on error.
*/
private IndexWriter getIndexWriter (int siteID,
Analyzer analyzer,
@@ -1400,7 +1372,7 @@
site = null;
} catch (Throwable t) {
logger.error (
- "An IO Exception occured when retrieving the index
writer for directory :" + indexDir,
+ "An IO Exception occured when retrieving the index
ramWriter for directory :" + indexDir,
t);
}
}
@@ -1460,7 +1432,7 @@
/**
* Close a IndexWriter
*
- * @param IndexWriter writer, the index writer
+ * @param IndexWriter ramWriter, the index ramWriter
*/
private void closeIndexWriter (IndexWriter writer) {
if (writer == null)
@@ -1469,7 +1441,7 @@
try {
writer.close ();
} catch (Throwable t) {
- logger.error ("Error while closing index writer:", t);
+ logger.error ("Error while closing index ramWriter:", t);
}
}
@@ -1600,4 +1572,168 @@
return null;
}
+ protected class SiteIndexer {
+
+ private int UNDEFINED = 0;
+ private int ADD = 1;
+ private int REMOVE = 2;
+
+ private int siteId;
+
+ private int maxDocs = 1000;
+ private Vector docs;
+ private int lastOperation = 0;
+
+ public SiteIndexer(int siteId, int maxDocs)
+ throws IOException, JahiaException {
+ this.siteId = siteId;
+ this.maxDocs = maxDocs;
+ this.docs = new Vector();
+ }
+
+ public synchronized void addDocument(JahiaIndexableDocument doc)
+ throws IOException, JahiaException {
+ if ( doc == null ){
+ return;
+ }
+ int requestOp = doc.toBeAdded()?ADD:REMOVE;
+ if ( this.getLastOperation() == UNDEFINED ) {
+ this.setLastOperation(requestOp);
+ docs.add(doc);
+ } else if ( this.getLastOperation() != requestOp ){
+ storeInPersistance();
+ this.setLastOperation(requestOp);
+ docs.add(doc);
+ } else {
+ docs.add(doc);
+ }
+
+ if ( docs.size() > maxDocs ){
+ storeInPersistance();
+ }
+ }
+
+ public synchronized void storeInPersistance()
+ throws IOException, JahiaException {
+ if ( docs.size() ==0 ) {
+ this.setLastOperation(UNDEFINED);
+ return;
+ }
+ Vector luceneDocs = new Vector();
+ Document luceneDoc = null;
+ int size = docs.size();
+ JahiaIndexableDocument doc = null;
+ for ( int i=0; i<size; i++ ){
+ doc = (JahiaIndexableDocument)docs.get(i);
+ luceneDoc = getLuceneDocument(doc);
+ if ( luceneDoc != null ){
+ luceneDocs.add(luceneDoc);
+ }
+ }
+
+ if ( this.getLastOperation() == ADD ) {
+ IndexReader reader = null;
+ try {
+ reader = getIndexReader(siteId);
+ size = luceneDocs.size();
+ doc = null;
+ for ( int i=0; i<size; i++ ){
+ doc = (JahiaIndexableDocument)docs.get(i);
+ try {
+ Term term = new Term(doc.getKeyFieldName(),
+ doc.getKey());
+ reader.delete(term);
+ } catch ( Throwable t ){
+ logger.debug("Error removing document from
index",t);
+ }
+ }
+ } catch ( Throwable t ) {
+ logger.debug("Error removing doc from index", t);
+ } finally {
+ closeIndexReader(reader);
+ }
+
+ IndexWriter fsWriter = null;
+ try {
+ fsWriter = getIndexWriter(siteId,analyzer,false);
+ if ( fsWriter == null ){
+ fsWriter = getIndexWriter(siteId,analyzer,true);
+ }
+ if ( fsWriter != null ){
+ SiteIndexationJobDetail job =
getSiteIndexationJob(siteId);
+ if ( (job != null && !job.isDone()) ){
+ if ( fsWriter.minMergeDocs < 1000 ){
+ fsWriter.minMergeDocs = 1000;
+ }
+ if ( fsWriter.mergeFactor < 30 ){
+ fsWriter.mergeFactor = 30;
+ }
+ }
+ luceneDoc = null;
+ size = luceneDocs.size();
+ for ( int i=0; i<size; i++ ){
+ luceneDoc = (Document)luceneDocs.get(i);
+ if (luceneDoc != null){
+ fsWriter.addDocument(luceneDoc);
+ }
+ }
+ }
+ } catch ( Throwable t ) {
+ logger.debug("Error adding doc from index", t);
+ } finally {
+ closeIndexWriter(fsWriter);
+ }
+ } else if ( this.getLastOperation() == REMOVE ) {
+ IndexReader reader = getIndexReader(siteId);
+ size = docs.size();
+ doc = null;
+ for ( int i=0; i<size; i++ ){
+ doc = (JahiaIndexableDocument)docs.get(i);
+ backgroundRemoveObjectFromSearchEngine(doc,reader);
+ doc = null;
+ }
+ closeIndexReader(reader);
+ }
+ this.docs = new Vector();
+ this.setLastOperation(UNDEFINED);
+ }
+
+ public int getSiteId() {
+ return siteId;
+ }
+
+ public void setSiteId(int siteId) {
+ this.siteId = siteId;
+ }
+
+ public int getMaxDocs() {
+ return maxDocs;
+ }
+
+ public void setMaxDocs(int maxDocs) {
+ this.maxDocs = maxDocs;
+ }
+
+ public synchronized int getNbDocs() {
+ return docs.size();
+ }
+
+ public synchronized Vector getDocs() {
+ return docs;
+ }
+
+ public synchronized void setDocs(Vector docs) {
+ this.docs = docs;
+ }
+
+ public int getLastOperation() {
+ return lastOperation;
+ }
+
+ public void setLastOperation(int lastOperation) {
+ this.lastOperation = lastOperation;
+ }
+
+ }
+
}
Index: ScheduledSiteIndexationJob.java
===================================================================
RCS file:
/home/cvs/repository/jahia/src/java/org/jahia/services/search/Attic/ScheduledSiteIndexationJob.java,v
retrieving revision 1.1.2.3
retrieving revision 1.1.2.4
diff -u -r1.1.2.3 -r1.1.2.4
--- ScheduledSiteIndexationJob.java 20 May 2005 11:45:50 -0000 1.1.2.3
+++ ScheduledSiteIndexationJob.java 23 May 2005 19:35:15 -0000 1.1.2.4
@@ -77,7 +77,13 @@
int size = listFieldID.size();
for (int i = 0; i < size; i++) {
int fieldID = ( (Integer) listFieldID.get(i)).intValue();
- while ( sReg.getJahiaSearchService().getNbDocumentsInQueue()>300
) {
+ while (
sReg.getJahiaSearchService().getNbDocumentsInQueue()>1000 ) {
+ if ( jobDetail.isKilled() ) {
+ jobDetail.setDone(false);
+ jobDetail.setEndTime(new Date().getTime());
+
jobDetail.setStatus(SiteIndexationJobDetail.ERROR_STATUS);
+ return;
+ }
try {
Thread.sleep(2000);
} catch ( java.lang.InterruptedException ie ){
Index: SiteIndexationJobDetail.java
===================================================================
RCS file:
/home/cvs/repository/jahia/src/java/org/jahia/services/search/Attic/SiteIndexationJobDetail.java,v
retrieving revision 1.1.2.2
retrieving revision 1.1.2.3
diff -u -r1.1.2.2 -r1.1.2.3
--- SiteIndexationJobDetail.java 20 May 2005 09:32:17 -0000 1.1.2.2
+++ SiteIndexationJobDetail.java 23 May 2005 19:35:15 -0000 1.1.2.3
@@ -22,6 +22,8 @@
private boolean done = false;
+ private boolean killed = false;
+
private int status = SUCCESS_STATUS;
private int siteId = 0;
@@ -77,5 +79,13 @@
public void setStatus(int status) {
this.status = status;
}
+
+ public boolean isKilled() {
+ return killed;
+ }
+
+ public void setKilled(boolean killed) {
+ this.killed = killed;
+ }
}