Author: kwright
Date: Sat Jan 26 12:43:49 2013
New Revision: 1438887
URL: http://svn.apache.org/viewvc?rev=1438887&view=rev
Log:
Go to session-based model for managing ElasticSearch connections; part of
CONNECTORS-606.
Modified:
manifoldcf/trunk/connectors/elasticsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/elasticsearch/ElasticSearchConnector.java
Modified:
manifoldcf/trunk/connectors/elasticsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/elasticsearch/ElasticSearchConnector.java
URL:
http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/elasticsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/elasticsearch/ElasticSearchConnector.java?rev=1438887&r1=1438886&r2=1438887&view=diff
==============================================================================
---
manifoldcf/trunk/connectors/elasticsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/elasticsearch/ElasticSearchConnector.java
(original)
+++
manifoldcf/trunk/connectors/elasticsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/elasticsearch/ElasticSearchConnector.java
Sat Jan 26 12:43:49 2013
@@ -25,6 +25,7 @@ import java.io.InputStream;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.impl.conn.PoolingClientConnectionManager;
@@ -90,10 +91,13 @@ public class ElasticSearchConnector exte
/** Forward to the template to view the specification parameters for the job
*/
private static final String VIEW_SPEC_FORWARD = "viewSpecification.html";
+ /** Connection expiration interval */
+ private static final long EXPIRATION_INTERVAL = 60000L;
private ClientConnectionManager connectionManager = null;
private HttpClient client = null;
-
+ private long expirationTime = -1L;
+
public ElasticSearchConnector()
{
}
@@ -102,23 +106,44 @@ public class ElasticSearchConnector exte
public void connect(ConfigParams configParams)
{
super.connect(configParams);
- PoolingClientConnectionManager localConnectionManager = new
PoolingClientConnectionManager();
- localConnectionManager.setMaxTotal(1);
- connectionManager = localConnectionManager;
- DefaultHttpClient localClient = new DefaultHttpClient(connectionManager);
- // No retries
- localClient.setHttpRequestRetryHandler(new HttpRequestRetryHandler()
- {
- public boolean retryRequest(
- IOException exception,
- int executionCount,
- HttpContext context)
- {
- return false;
- }
-
- });
- client = localClient;
+ }
+
+ protected HttpClient getSession()
+ throws ManifoldCFException
+ {
+ if (client == null)
+ {
+ PoolingClientConnectionManager localConnectionManager = new
PoolingClientConnectionManager();
+ localConnectionManager.setMaxTotal(1);
+ connectionManager = localConnectionManager;
+ DefaultHttpClient localClient = new DefaultHttpClient(connectionManager);
+ // No retries
+ localClient.setHttpRequestRetryHandler(new HttpRequestRetryHandler()
+ {
+ public boolean retryRequest(
+ IOException exception,
+ int executionCount,
+ HttpContext context)
+ {
+ return false;
+ }
+
+ });
+ client = localClient;
+ }
+ expirationTime = System.currentTimeMillis() + EXPIRATION_INTERVAL;
+ return client;
+ }
+
+ protected void closeSession()
+ {
+ if (connectionManager != null)
+ {
+ connectionManager.shutdown();
+ connectionManager = null;
+ }
+ client = null;
+ expirationTime = -1L;
}
@Override
@@ -126,18 +151,22 @@ public class ElasticSearchConnector exte
throws ManifoldCFException
{
super.disconnect();
- connectionManager.shutdown();
- connectionManager = null;
- client = null;
+ closeSession();
}
+
@Override
public void poll()
throws ManifoldCFException
{
super.poll();
- // Free idle connections in the pool.
- // MHL
+ if (connectionManager != null)
+ {
+ if (System.currentTimeMillis() > expirationTime)
+ {
+ closeSession();
+ }
+ }
}
@Override
@@ -335,6 +364,7 @@ public class ElasticSearchConnector exte
IOutputAddActivity activities) throws ManifoldCFException,
ServiceInterruption
{
+ HttpClient client = getSession();
ElasticSearchConfig config = getConfigParameters(null);
InputStream inputStream = document.getBinaryStream();
long startTime = System.currentTimeMillis();
@@ -353,6 +383,7 @@ public class ElasticSearchConnector exte
IOutputRemoveActivity activities) throws ManifoldCFException,
ServiceInterruption
{
+ HttpClient client = getSession();
long startTime = System.currentTimeMillis();
ElasticSearchDelete od = new ElasticSearchDelete(client, documentURI,
getConfigParameters(null));
@@ -363,6 +394,7 @@ public class ElasticSearchConnector exte
@Override
public String check() throws ManifoldCFException
{
+ HttpClient client = getSession();
ElasticSearchAction oss = new ElasticSearchAction(client,
CommandEnum._status,
getConfigParameters(null), true);
String resultName = oss.getResult().name();
@@ -375,6 +407,7 @@ public class ElasticSearchConnector exte
public void noteJobComplete(IOutputNotifyActivity activities)
throws ManifoldCFException, ServiceInterruption
{
+ HttpClient client = getSession();
long startTime = System.currentTimeMillis();
ElasticSearchAction oo = new ElasticSearchAction(client,
CommandEnum._optimize,
getConfigParameters(null), false);