Author: rwesten
Date: Mon Aug 26 10:31:13 2013
New Revision: 1517487

URL: http://svn.apache.org/r1517487
Log:
STANBOL-1128: fixed a bug where deactivating the engine while FST structures 
where currently build where closing low level Solr streams. The Issues was 
actually the ExecutorService#shutdownNow() where interupting worker threads and 
FileChannels (as used by Solr) do get closed if either the producing or 
receiving thread get interupted (for security reasons). The new implemetnation 
does not use shutdownNow, but only uses shutdown(). The FstCreationTask now 
knows if creation tasks where cancelled and does not start (of if already 
started does ignore the result). Currently executed FST creation tasks can not 
be cancelled and will run until completed.

Modified:
    stanbol/trunk/enhancement-engines/lucenefstlinking/README.md
    
stanbol/trunk/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/CorpusCreationTask.java
    
stanbol/trunk/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/FstLinkingEngine.java
    
stanbol/trunk/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/FstLinkingEngineComponent.java
    
stanbol/trunk/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/IndexConfiguration.java
    
stanbol/trunk/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/TaggingSession.java

Modified: stanbol/trunk/enhancement-engines/lucenefstlinking/README.md
URL: 
http://svn.apache.org/viewvc/stanbol/trunk/enhancement-engines/lucenefstlinking/README.md?rev=1517487&r1=1517486&r2=1517487&view=diff
==============================================================================
--- stanbol/trunk/enhancement-engines/lucenefstlinking/README.md (original)
+++ stanbol/trunk/enhancement-engines/lucenefstlinking/README.md Mon Aug 26 
10:31:13 2013
@@ -154,8 +154,8 @@ __Other__
 
 As the first version of the FST Linking Engine is still in active development 
their are some know issues:
 
-* Stopping an Engine while FST models are build will close the IndexReader of 
the underlaying Solr index. This will cause any followup requests to the Solr 
index to fail with Exceptions.
 * Currently FST models are not updated if the Solr index is changed. This 
means that this Engine currently only works for read-only indexes. If a Index 
is changed users will need to delete the FST file and restart the Engine to 
trigger the recreation of the FST model
 * the Japanese FieldType as specified in the 
[fst_field_types.xml](fst_field_types.xml) file does produce position 
increments != 1
+* the RefCounted EntityCache is not destroyed prior to finalise(). This means 
that at some point the reference count is not correctly dereferenced. 
 
 

Modified: 
stanbol/trunk/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/CorpusCreationTask.java
URL: 
http://svn.apache.org/viewvc/stanbol/trunk/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/CorpusCreationTask.java?rev=1517487&r1=1517486&r2=1517487&view=diff
==============================================================================
--- 
stanbol/trunk/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/CorpusCreationTask.java
 (original)
+++ 
stanbol/trunk/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/CorpusCreationTask.java
 Mon Aug 26 10:31:13 2013
@@ -19,10 +19,9 @@ package org.apache.stanbol.enhancer.engi
 import java.io.IOException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.search.SolrIndexSearcher;
 import org.apache.solr.util.RefCounted;
@@ -42,65 +41,70 @@ public class CorpusCreationTask implemen
     private final Logger log = 
LoggerFactory.getLogger(CorpusCreationTask.class);
     
     private final CorpusInfo fstInfo;
-    private final SolrCore core;
+    private final IndexConfiguration indexConfig;
     private final long enqueued;
     
-    public CorpusCreationTask(SolrCore core, CorpusInfo fstInfo){
-        if(core == null || fstInfo == null){
+    public CorpusCreationTask(IndexConfiguration indexConfig, CorpusInfo 
fstInfo){
+        if(indexConfig == null || fstInfo == null){
             throw new IllegalArgumentException("Pared parameters MUST NOT be 
NULL!");
         }
-        this.core = core;
+        this.indexConfig = indexConfig;
         this.fstInfo = fstInfo;
         this.enqueued = fstInfo.enqueue();
     }
     
     @Override
     public void run() {
+        if(!indexConfig.isActive()){
+            return; //task cancelled
+        }
         //check if the FST corpus was enqueued a 2nd time
         if(enqueued != fstInfo.getEnqueued()){
             return;
         }
+        SolrCore core = indexConfig.getIndex();
         if(core.isClosed()){
             log.warn("Unable to build {} becuase SolrCore {} is 
closed!",fstInfo,core.getName());
             return;
         }
+        TaggerFstCorpus corpus = null;
         RefCounted<SolrIndexSearcher> searcherRef = core.getSearcher();
-        SolrIndexSearcher searcher = searcherRef.get();
-        DirectoryReader reader = searcher.getIndexReader();
-        TaggerFstCorpus corpus;
         try {
+            SolrIndexSearcher searcher = searcherRef.get();
+            //we do get the AtomicReader, because TaggerFstCorpus will need it
+            //anyways. This prevents to create another 
SlowCompositeReaderWrapper.
+            IndexReader reader = searcher.getAtomicReader();
             log.info(" ... build FST corpus for {}",fstInfo);
-            corpus = new TaggerFstCorpus(reader, reader.getVersion(),
+            corpus = new TaggerFstCorpus(reader, 
searcher.getIndexReader().getVersion(),
                 null, fstInfo.indexedField, fstInfo.storedField, 
fstInfo.analyzer,
                 fstInfo.partialMatches,1,100);
-        } catch (Exception e) {
-            log.warn("Unable to build "+fstInfo+"!",e);
-            return;
+        } catch (IOException e) {
+            throw new IllegalStateException("Unable to read Information to 
build "
+                    + fstInfo + " from SolrIndex '" + core.getName() + "'!", 
e);
         } finally {
-//            try {
-//                reader.close();
-//            } catch (IOException e) { /* ignore */ }
-            searcherRef.decref();
+            searcherRef.decref(); //ensure that we dereference the searcher
         }
-        if(fstInfo.fst.exists()){
-            if(!FileUtils.deleteQuietly(fstInfo.fst)){
-                log.warn("Unable to delete existing FST fiel for {}",fstInfo);
+        if(indexConfig.isActive()){
+            if(fstInfo.fst.exists()){
+                if(!FileUtils.deleteQuietly(fstInfo.fst)){
+                    log.warn("Unable to delete existing FST fiel for 
{}",fstInfo);
+                }
             }
-        }
-        try {
-            corpus.save(fstInfo.fst);
-        } catch (IOException e) {
-            log.warn("Unable to store FST corpus " + fstInfo + " to "
-                    + fstInfo.fst.getAbsolutePath() + "!", e);
-        }
-        //set the created corpus to the FST Info
-        fstInfo.setCorpus(enqueued, corpus);
+            try {
+                corpus.save(fstInfo.fst);
+            } catch (IOException e) {
+                log.warn("Unable to store FST corpus " + fstInfo + " to "
+                        + fstInfo.fst.getAbsolutePath() + "!", e);
+            }
+            //set the created corpus to the FST Info
+            fstInfo.setCorpus(enqueued, corpus);
+        } //else index configuration no longer active ... ignore the built FST
     }
     
     @Override
     public String toString() {
         return new StringBuilder("Task: building ").append(fstInfo)
-                .append(" for SolrCore ").append(core.getName()).toString();
+                .append(" for SolrCore 
").append(indexConfig.getIndex().getName()).toString();
     }
 
 }

Modified: 
stanbol/trunk/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/FstLinkingEngine.java
URL: 
http://svn.apache.org/viewvc/stanbol/trunk/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/FstLinkingEngine.java?rev=1517487&r1=1517486&r2=1517487&view=diff
==============================================================================
--- 
stanbol/trunk/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/FstLinkingEngine.java
 (original)
+++ 
stanbol/trunk/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/FstLinkingEngine.java
 Mon Aug 26 10:31:13 2013
@@ -198,7 +198,6 @@ public class FstLinkingEngine implements
             }
             log.debug("Process Matches for {} extragted Tags:",tags.size());
             int matches = match(at,tags.values());
-            //thr remaining code is logging only
             if(log.isTraceEnabled()){
                 String text = at.getSpan();
                 for(Tag tag : tags.values()){
@@ -235,6 +234,7 @@ public class FstLinkingEngine implements
         } finally {
             ci.getLock().writeLock().unlock();
         }
+        tags.clear(); //help the GC
     }
 
     private int match(AnalysedText at, Collection<Tag> tags) {

Modified: 
stanbol/trunk/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/FstLinkingEngineComponent.java
URL: 
http://svn.apache.org/viewvc/stanbol/trunk/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/FstLinkingEngineComponent.java?rev=1517487&r1=1517486&r2=1517487&view=diff
==============================================================================
--- 
stanbol/trunk/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/FstLinkingEngineComponent.java
 (original)
+++ 
stanbol/trunk/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/FstLinkingEngineComponent.java
 Mon Aug 26 10:31:13 2013
@@ -372,6 +372,8 @@ public class FstLinkingEngineComponent {
      * @see #DEFAULT_ENTITY_CACHE_SIZE
      */
     private EntityCacheManager documentCacheFactory;
+
+    private IndexConfiguration indexConfig;
     
     /**
      * Default constructor as used by OSGI. This expects that 
@@ -461,13 +463,19 @@ public class FstLinkingEngineComponent {
         ThreadFactory tf = new 
NamedThreadFactory(engineName+"-FST-RuntimeCreation");
         //TODO: maybe use the more advanced 
         //    com.google.common.util.concurrent.ThreadFactoryBuilder
-        if(fstCreatorService == null || fstCreatorService.isTerminated()){
-            fstCreatorService =  Executors.newFixedThreadPool(tpSize,tf);
-        } else { //still running from a previous activation
-            //try to shutdownNow
-            fstCreatorService.shutdownNow();
-            fstCreatorService = Executors.newFixedThreadPool(tpSize,tf);
+        if(fstCreatorService != null && !fstCreatorService.isTerminated()){
+            //NOTE: We can not call terminateNow, because to interrupt threads
+            //      here would also close FileChannels used by the SolrCore
+            //      and produce java.nio.channels.ClosedByInterruptException
+            //      exceptions followed by 
java.nio.channels.ClosedChannelException
+            //      on following calls to affected files of the SolrIndex.
+            
+            //Because of that we just log a warning and let uncompleted tasks
+            //complete!
+            log.warn("some items in a previouse FST Runtime Creation 
Threadpool have "
+                + "still not finished!");
         }
+        fstCreatorService = Executors.newFixedThreadPool(tpSize,tf);
         
         //(6) Parse the EntityCache config
         int ecSize;
@@ -568,10 +576,10 @@ public class FstLinkingEngineComponent {
         if(reference == null && this.indexReference == null){
             return; //nothing to do
         }
-        final IndexConfiguration indexConfig;
         BundleContext bundleContext = this.bundleContext;
         synchronized (this) { //init one after the other in case of multiple 
calls
             SolrCore core;
+            IndexConfiguration indexConfig; // the indexConfig build by this 
call
             try {
                 if(bundleContext == null){ //already deactivated
                     return; //NOTE: unregistering is done in finally block
@@ -652,7 +660,7 @@ public class FstLinkingEngineComponent {
                 //check if the fst does not exist and the fstInfo allows 
creation
                 if(!fstInfo.fst.exists() && fstInfo.allowCreation){
                     //create a task on the FST corpus creation service
-                    fstCreatorService.execute(new CorpusCreationTask(core, 
fstInfo));
+                    fstCreatorService.execute(new 
CorpusCreationTask(indexConfig, fstInfo));
                 }
             }
             //set the default linking corpora
@@ -663,6 +671,8 @@ public class FstLinkingEngineComponent {
             CorpusInfo defaultCoprous = indexConfig.getCorpus(defaultLanguage);
             log.info(" ... set '{}' as default FST Corpus: {}", 
defaultCoprous.language, defaultCoprous);
             indexConfig.setDefaultCorpus(defaultCoprous);
+            //set the index configuration to the field;
+            this.indexConfig = indexConfig;
             FstLinkingEngine engine = new FstLinkingEngine(engineName, 
indexConfig,
                 textProcessingConfig, entityLinkerConfig);
             String[] services = new String [] {
@@ -858,7 +868,7 @@ public class FstLinkingEngineComponent {
                             CorpusInfo langFstInfo = new CorpusInfo(language, 
                                 encodedLangIndexField,encodedLangStoreField,
                                 fieldType.getAnalyzer(), langFstFile, 
langAllowCreation);
-                            log.info("   ... add {} for explicitly configured 
language", langFstInfo);
+                            log.debug("   ... add {} for explicitly configured 
language", langFstInfo);
                             indexConfig.addCorpus(langFstInfo);
                             foundCorpus = true;
                         } else {
@@ -902,6 +912,11 @@ public class FstLinkingEngineComponent {
             solrServer.close(); //decrease the reference count!!
             this.solrCore = null; //rest the field
         }
+        //deactivate the index configuration if present
+        if(indexConfig != null){
+            indexConfig.deactivate();
+            indexConfig = null;
+        }
     }
 
     /**
@@ -971,18 +986,16 @@ public class FstLinkingEngineComponent {
     protected void deactivate(ComponentContext ctx) {
         if(solrServerTracker != null){
             //closing the tracker will also cause registered engines to be
-            //unregistered as service (see #unregisterEngine())
+            //unregistered as service (see #updateEngineRegistration())
             solrServerTracker.close();
             solrServerTracker = null;
         }
         if(fstCreatorService != null){
-            //cancel not yet started tasks, because with the next activation
-            //those might be outdated.
-            List<Runnable> canceled = fstCreatorService.shutdownNow();
-            log.info("Cancelled FST initialistion tasks because of 
deactivation:");
-            for(Runnable r : canceled){
-                log.info(" > {}",r);
-            }
+            //we MUST NOT call shutdownNow(), because this would close
+            //low level Solr FileChannels.
+            fstCreatorService.shutdown();
+            //do not set NULL, as we want to warn users an re-activation if old
+            //threads are still running.
         }
         indexReference = null;
         engineMetadata = null;

Modified: 
stanbol/trunk/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/IndexConfiguration.java
URL: 
http://svn.apache.org/viewvc/stanbol/trunk/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/IndexConfiguration.java?rev=1517487&r1=1517486&r2=1517487&view=diff
==============================================================================
--- 
stanbol/trunk/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/IndexConfiguration.java
 (original)
+++ 
stanbol/trunk/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/IndexConfiguration.java
 Mon Aug 26 10:31:13 2013
@@ -81,6 +81,8 @@ public class IndexConfiguration {
    
     private final LanguageConfiguration fstConfig;
 
+    private boolean active = true;
+    
     public IndexConfiguration(LanguageConfiguration fstConfig, SolrCore index){
         if(fstConfig == null){
             throw new IllegalArgumentException("The parsed FST configuration 
MUST NOT be NULL!");
@@ -214,4 +216,18 @@ public class IndexConfiguration {
     public EntityCacheManager getEntityCacheManager() {
         return entityCacheManager;
     }
+    /**
+     * Deactivates this {@link IndexConfiguration}
+     */
+    public void deactivate(){
+        active = false;
+    }
+    /**
+     * If this {@link IndexConfiguration} is still active
+     * @return <code>true</code> if still active. Otherwise <code>false</code>
+     */
+    public boolean isActive(){
+        return active;
+    }
+    
 }

Modified: 
stanbol/trunk/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/TaggingSession.java
URL: 
http://svn.apache.org/viewvc/stanbol/trunk/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/TaggingSession.java?rev=1517487&r1=1517486&r2=1517487&view=diff
==============================================================================
--- 
stanbol/trunk/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/TaggingSession.java
 (original)
+++ 
stanbol/trunk/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/TaggingSession.java
 Mon Aug 26 10:31:13 2013
@@ -105,7 +105,7 @@ public class TaggingSession implements C
     private int docLoaded = 0;
     private int docCached = 0;
     private int docAppended = 0;
-    private final ValueSourceAccessor uniqueKeyCache;
+    //private final ValueSourceAccessor uniqueKeyCache;
     //private final Map<Integer,Match> matchPool = new 
HashMap<Integer,Match>(2048);
     private final FieldLoaderImpl fieldLoader;
 
@@ -182,7 +182,7 @@ public class TaggingSession implements C
         searcherRef = config.getIndex().getSearcher();
         SolrIndexSearcher searcher = searcherRef.get();
         documentCacheRef = config.getEntityCacheManager().getCache(searcher);
-        uniqueKeyCache = null; //no longer used.
+//        uniqueKeyCache = null; //no longer used.
 //        uniqueKeyCache = new ValueSourceAccessor(searcher, 
idSchemaField.getType()
 //            .getValueSource(idSchemaField, null));
         fieldLoader = new FieldLoaderImpl(searcher.getIndexReader());
@@ -324,7 +324,7 @@ public class TaggingSession implements C
                     if(config.getExecutorService() != null){
                         // TODO: this code should get moved to a CorpusManager 
class
                         config.getExecutorService().execute(
-                            new CorpusCreationTask(config.getIndex(), 
fstInfo));
+                            new CorpusCreationTask(config, fstInfo));
                         throw new CorpusException("The FST corpus for language 
'"
                                 + fstInfo.language + "' was invalid and is now 
"
                                 + "enqueued for re-creation. Retry at a  later 
"


Reply via email to