Author: rwesten
Date: Mon Oct 19 11:57:34 2015
New Revision: 1709397
URL: http://svn.apache.org/viewvc?rev=1709397&view=rev
Log:
fix for STANBOL-1443: The FastLRUCacheManager is now thread save; Minor: The
FST Linking engine does no longer force a new Searcher to be opened during
initialization; The Enhancer Stress Test Tool does no longer use
Assert.assert*(..) during response processing. This caused responses not to be
marked as success or failed. Now exceptions are thrown instead.
Modified:
stanbol/branches/release-0.12/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/IndexConfiguration.java
stanbol/branches/release-0.12/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/cache/FastLRUCacheManager.java
stanbol/branches/release-0.12/integration-tests/pom.xml
stanbol/branches/release-0.12/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/MultiThreadedTestBase.java
Modified:
stanbol/branches/release-0.12/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/IndexConfiguration.java
URL:
http://svn.apache.org/viewvc/stanbol/branches/release-0.12/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/IndexConfiguration.java?rev=1709397&r1=1709396&r2=1709397&view=diff
==============================================================================
---
stanbol/branches/release-0.12/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/IndexConfiguration.java
(original)
+++
stanbol/branches/release-0.12/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/IndexConfiguration.java
Mon Oct 19 11:57:34 2015
@@ -413,7 +413,7 @@ public class IndexConfiguration {
*/
public boolean activate() {
active = true;
- RefCounted<SolrIndexSearcher> searcherRef = index.getSearcher(true,
true, null);
+ RefCounted<SolrIndexSearcher> searcherRef = index.getSearcher();
try {
return processFstConfig(searcherRef.get().getAtomicReader());
}catch (RuntimeException e) { //in case of any excpetion
Modified:
stanbol/branches/release-0.12/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/cache/FastLRUCacheManager.java
URL:
http://svn.apache.org/viewvc/stanbol/branches/release-0.12/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/cache/FastLRUCacheManager.java?rev=1709397&r1=1709396&r2=1709397&view=diff
==============================================================================
---
stanbol/branches/release-0.12/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/cache/FastLRUCacheManager.java
(original)
+++
stanbol/branches/release-0.12/enhancement-engines/lucenefstlinking/src/main/java/org/apache/stanbol/enhancer/engines/lucenefstlinking/cache/FastLRUCacheManager.java
Mon Oct 19 11:57:34 2015
@@ -19,6 +19,8 @@ package org.apache.stanbol.enhancer.engi
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.lucene.document.Document;
import org.apache.solr.search.CacheRegenerator;
@@ -42,6 +44,8 @@ public class FastLRUCacheManager impleme
RefCounted<EntityCache> current;
private final CacheRegenerator regenerator;
private final Map<String,String> config;
+ private ReadWriteLock lock = new ReentrantReadWriteLock();
+
/**
* Creates a cache manager instance with the parsed maximum size and no
@@ -77,36 +81,63 @@ public class FastLRUCacheManager impleme
@Override
public RefCounted<EntityCache> getCache(Object version) {
- if(current == null || !current.get().getVersion().equals(version)){
- if(current != null){
- log.debug(" > invalidate EntityCache for version {}",
current.get().getVersion());
- //remove the reference to the old instance. This will allow to
- //destroy the old cache as soon as it is no longer used
- current.decref();
- log.debug(" ... {} remaining users for invalidated Cache",
current.getRefcount());
- current = null;
+ lock.readLock().lock();
+ try {
+ if(current != null && current.get().getVersion().equals(version)){
+ current.incref(); //this increase is for the holder of the
returned instance
+ log.debug(" > increase RefCount for EntityCache for version {}
to {}",
+ version, current.getRefcount());
+ return current;
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ //still here ... looks like we need to build a new one
+ lock.writeLock().lock();
+ try {
+ //check again ... an other thread might have already built the
cache
+ //for the requested version
+ if(current == null || !current.get().getVersion().equals(version)){
+ if(current != null){
+ log.debug(" > invalidate EntityCache for version {}",
current.get().getVersion());
+ //remove the reference to the old instance. This will
allow to
+ //destroy the old cache as soon as it is no longer used
+ current.decref();
+ log.debug(" ... {} remaining users for invalidated
Cache", current.getRefcount());
+ current = null;
+ }
+ //create a new cache
+ log.debug(" > create EntityCache for version {}", version);
+ SolrCache<Integer,Document> cache = new
FastLRUCache<Integer,Document>();
+ cache.init(config, null, regenerator);
+ current = new RefCountedImpl(new SolrEntityCache(version,
cache));
+ //add a reference to the new cache by this class. This will be
removed
+ //as soon as the instance is outdated
+ current.incref();
}
- //create a new cache
- log.debug(" > create EntityCache for version {}", version);
- SolrCache<Integer,Document> cache = new
FastLRUCache<Integer,Document>();
- cache.init(config, null, regenerator);
- current = new RefCountedImpl(new SolrEntityCache(version, cache));
- //add a reference to the new cache by this class. This will be
removed
- //as soon as the instance is outdated
- current.incref();
+ current.incref(); //this increase is for the holder of the
returned instance
+ log.debug(" > increase RefCount for EntityCache for version {} to
{}",
+ version, current.getRefcount());
+ return current;
+ } finally {
+ lock.writeLock().unlock();
}
- current.incref(); //this increase is for the holder of the returned
instance
- log.debug(" > increase RefCount for EntityCache for version {} to {}",
- version, current.getRefcount());
- return current;
}
@Override
public void close() {
- if(current != null){
- current.decref();
- current = null;
- }
+ lock.writeLock().lock();
+ try {
+ if(current != null){
+ Object version = log.isDebugEnabled() ?
current.get().getVersion() : null;
+ current.decref();
+ log.debug(" > close EntityCache for version {} (remaining
refCount: {})",
+ version , current.getRefcount());
+ current = null;
+ }
+ } finally{
+ lock.writeLock().unlock();
+ }
}
@Override
Modified: stanbol/branches/release-0.12/integration-tests/pom.xml
URL:
http://svn.apache.org/viewvc/stanbol/branches/release-0.12/integration-tests/pom.xml?rev=1709397&r1=1709396&r2=1709397&view=diff
==============================================================================
--- stanbol/branches/release-0.12/integration-tests/pom.xml (original)
+++ stanbol/branches/release-0.12/integration-tests/pom.xml Mon Oct 19 11:57:34
2015
@@ -115,7 +115,7 @@
<server.ready.timeout.seconds>180</server.ready.timeout.seconds>
<server.ready.path.1>/:stanbol.css</server.ready.path.1>
<server.ready.path.2>/enhancer:Stateless REST
analysis:Accept:text/html</server.ready.path.2>
- <server.ready.path.3>/entityhub:The RESTful API of the
Entityhub</server.ready.path.3>
+ <!-- server.ready.path.3>/entityhub:The RESTful API of the
Entityhub</server.ready.path.3 -->
<!-- comment the following paths to use the integration test with
the stable launcher
<server.ready.path.4>/contenthub:Recently uploaded Content
Items</server.ready.path.4>
<server.ready.path.5>/ontonet:Apache Stanbol
OntoNet:Accept:text/html</server.ready.path.5>
Modified:
stanbol/branches/release-0.12/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/MultiThreadedTestBase.java
URL:
http://svn.apache.org/viewvc/stanbol/branches/release-0.12/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/MultiThreadedTestBase.java?rev=1709397&r1=1709396&r2=1709397&view=diff
==============================================================================
---
stanbol/branches/release-0.12/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/MultiThreadedTestBase.java
(original)
+++
stanbol/branches/release-0.12/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/MultiThreadedTestBase.java
Mon Oct 19 11:57:34 2015
@@ -56,6 +56,7 @@ import org.apache.commons.compress.archi
import
org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.config.SocketConfig;
import org.apache.http.impl.client.CloseableHttpClient;
@@ -78,6 +79,8 @@ import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.hp.hpl.jena.graph.GetTriple;
+
/**
* Base class for multi threaded tests
* @author westei
@@ -374,14 +377,19 @@ public abstract class MultiThreadedTestB
int testNum;
for(testNum = 0;testDataIterator.hasNext() && testNum <
settings.getMaxRequests(); testNum++){
String test = testDataIterator.next();
- Request request = builder.buildPostRequest(getEndpoint())
- .withHeader("Accept",rdfFormat)
- .withContent(test);
- tracker.register(request,test);
- if(testNum%100 == 0){
- log.info(" ... sent {} Requests ({} finished, {} pending, {}
failed",
- new Object[]{testNum,tracker.getNumCompleted(),
-
tracker.getNumPending(),tracker.getFailed().size()});
+ if(StringUtils.isNotBlank(test)){
+ Request request = builder.buildPostRequest(getEndpoint())
+ .withHeader("Accept",rdfFormat)
+ .withContent(test);
+ tracker.register(request, test);
+ if(testNum%100 == 0){
+ log.info(" ... sent {} Requests ({} finished, {} pending,
{} failed",
+ new Object[]{testNum,tracker.getNumCompleted(),
+
tracker.getNumPending(),tracker.getFailed().size()});
+ }
+ } else {
+ log.warn(" - TestDataIterator returned empty or NULL content
(igonred)");
+ testNum--;
}
}
log.info("> All {} requests sent!",testNum);
@@ -396,7 +404,7 @@ public abstract class MultiThreadedTestB
tracker.printStatistics();
log.warn("Content(s) of Faild tests:");
int i=1;
- for(Entry<RequestExecutor,String> failed
:tracker.getFailed().entrySet()) {
+ for(Entry<RequestExecutor,String> failed :
tracker.getFailed().entrySet()) {
log.warn("Failed ({}):",i);
log.warn(" > Request: {}"+failed.getKey().getRequest());
log.warn(" > Response: {}"+failed.getKey().getResponse());
@@ -610,12 +618,12 @@ public abstract class MultiThreadedTestB
protected ExcutionTracker(ExecutorService executorService){
this(executorService,100);
}
- public ExcutionTracker(ExecutorService executorService,int
maxRegistered) {
+ public ExcutionTracker(ExecutorService executorService, int
maxRegistered) {
this.executorService = executorService;
this.maxRegistered = maxRegistered <= 0 ? Integer.MAX_VALUE :
maxRegistered;
}
- public void register(Request request,String content){
+ public void register(Request request, String content){
synchronized (registered) {
while(registered.size() >= maxRegistered){
try {
@@ -625,7 +633,7 @@ public abstract class MultiThreadedTestB
}
}
registered.add(request);
- executorService.execute(new AsyncExecuter(content,request,
this));
+ executorService.execute(new AsyncExecuter(content, request,
this));
}
}
@@ -645,7 +653,7 @@ public abstract class MultiThreadedTestB
}
}
- void failed(Request request, String content,RequestExecutor executor) {
+ void failed(Request request, String content, RequestExecutor executor)
{
synchronized (registered) {
failed.put(executor,content);
if(registered.remove(request)){
@@ -738,7 +746,6 @@ public abstract class MultiThreadedTestB
Long rtt;
try {
executor.execute(request).assertStatus(200);
- content = null; //do not store content for successfull resutls
rtt = System.currentTimeMillis()-start;
} catch (Throwable e) {
log.warn("Error while sending Request ",e);
@@ -749,17 +756,22 @@ public abstract class MultiThreadedTestB
IndexedMGraph graph = new IndexedMGraph();
try {
rdfParser.parse(graph,executor.getStream(),
executor.getContentType().getMimeType());
+ Iterator<Triple> ciIt = graph.filter(null,
Properties.ENHANCER_EXTRACTED_FROM, null);
+ if(!ciIt.hasNext()){
+ throw new IllegalStateException("Enhancement Results do
not caontain a single Enhancement");
+ }
+ Resource contentItemUri = ciIt.next().getObject();
+ if(!(contentItemUri instanceof UriRef)){
+ throw new IllegalStateException("ContentItem URI is not an
UriRef but an instance of "
+ + contentItemUri.getClass().getSimpleName());
+ }
+ tracker.succeed(request, (UriRef) contentItemUri, graph, rtt,
executor.getContent().length());
+ content = null; //do not store content for successful results
} catch (Exception e) {
log.warn("Exception while parsing Enhancement Response",e);
tracker.failed(request, content, executor);
return;
}
- Iterator<Triple> ciIt = graph.filter(null,
Properties.ENHANCER_EXTRACTED_FROM, null);
- Assert.assertTrue("Enhancement Results do not caontain a single
Enhancement",ciIt.hasNext());
- Resource contentItemUri = ciIt.next().getObject();
- Assert.assertTrue("ContentItem URI is not an UriRef but an
instance of "
- + contentItemUri.getClass().getSimpleName(),
contentItemUri instanceof UriRef);
- tracker.succeed(request, (UriRef) contentItemUri, graph, rtt,
executor.getContent().length());
}
}