Repository: nifi
Updated Branches:
  refs/heads/master ef80549d6 -> 1e56de952


NIFI-1417 Exposing several connection settings on the Solr processors to help 
deal with timeouts and adding an @OnStopped method to close the SolrClient 
which didn't appear to be happening before

This closes #194


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1e56de95
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1e56de95
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1e56de95

Branch: refs/heads/master
Commit: 1e56de9521e4bc0752b419ffc7d62e096db1c389
Parents: ef80549
Author: Bryan Bende <[email protected]>
Authored: Fri Jan 29 11:12:30 2016 -0500
Committer: Bryan Bende <[email protected]>
Committed: Mon Feb 1 18:42:00 2016 -0500

----------------------------------------------------------------------
 .../apache/nifi/processors/solr/GetSolr.java    |  6 ++
 .../processors/solr/PutSolrContentStream.java   |  6 ++
 .../nifi/processors/solr/SolrProcessor.java     | 92 ++++++++++++++++++--
 .../solr/TestPutSolrContentStream.java          | 11 +--
 4 files changed, 105 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/1e56de95/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
index a85aa0f..0de5b08 100644
--- 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
+++ 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
@@ -139,6 +139,12 @@ public class GetSolr extends SolrProcessor {
         descriptors.add(SORT_CLAUSE);
         descriptors.add(DATE_FIELD);
         descriptors.add(BATCH_SIZE);
+        descriptors.add(SOLR_SOCKET_TIMEOUT);
+        descriptors.add(SOLR_CONNECTION_TIMEOUT);
+        descriptors.add(SOLR_MAX_CONNECTIONS);
+        descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST);
+        descriptors.add(ZK_CLIENT_TIMEOUT);
+        descriptors.add(ZK_CONNECTION_TIMEOUT);
         this.descriptors = Collections.unmodifiableList(descriptors);
 
         final Set<Relationship> relationships = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/nifi/blob/1e56de95/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
index df034c9..ca16286 100644
--- 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
+++ 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
@@ -123,6 +123,12 @@ public class PutSolrContentStream extends SolrProcessor {
         descriptors.add(CONTENT_STREAM_PATH);
         descriptors.add(CONTENT_TYPE);
         descriptors.add(COMMIT_WITHIN);
+        descriptors.add(SOLR_SOCKET_TIMEOUT);
+        descriptors.add(SOLR_CONNECTION_TIMEOUT);
+        descriptors.add(SOLR_MAX_CONNECTIONS);
+        descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST);
+        descriptors.add(ZK_CLIENT_TIMEOUT);
+        descriptors.add(ZK_CONNECTION_TIMEOUT);
         this.descriptors = Collections.unmodifiableList(descriptors);
 
         final Set<Relationship> relationships = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/nifi/blob/1e56de95/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
index 27f208a..2941382 100644
--- 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
+++ 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
@@ -18,7 +18,9 @@
  */
 package org.apache.nifi.processors.solr;
 
+import org.apache.http.client.HttpClient;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
@@ -28,12 +30,15 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpClientUtil;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.common.params.ModifiableSolrParams;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A base class for processors that interact with Apache Solr.
@@ -71,6 +76,54 @@ public abstract class SolrProcessor extends 
AbstractProcessor {
             .expressionLanguageSupported(true)
             .build();
 
+    public static final PropertyDescriptor SOLR_SOCKET_TIMEOUT = new 
PropertyDescriptor
+            .Builder().name("Solr Socket Timeout")
+            .description("The amount of time to wait for data on a socket 
connection to Solr. A value of 0 indicates an infinite timeout.")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("10 seconds")
+            .build();
+
+    public static final PropertyDescriptor SOLR_CONNECTION_TIMEOUT = new 
PropertyDescriptor
+            .Builder().name("Solr Connection Timeout")
+            .description("The amount of time to wait when establishing a 
connection to Solr. A value of 0 indicates an infinite timeout.")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("10 seconds")
+            .build();
+
+    public static final PropertyDescriptor SOLR_MAX_CONNECTIONS = new 
PropertyDescriptor
+            .Builder().name("Solr Maximum Connections")
+            .description("The maximum number of total connections allowed from 
the Solr client to Solr.")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("10")
+            .build();
+
+    public static final PropertyDescriptor SOLR_MAX_CONNECTIONS_PER_HOST = new 
PropertyDescriptor
+            .Builder().name("Solr Maximum Connections Per Host")
+            .description("The maximum number of connections allowed from the 
Solr client to a single Solr host.")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("5")
+            .build();
+
+    public static final PropertyDescriptor ZK_CLIENT_TIMEOUT = new 
PropertyDescriptor
+            .Builder().name("ZooKeeper Client Timeout")
+            .description("The amount of time to wait for data on a connection 
to ZooKeeper, only used with a Solr Type of Cloud.")
+            .required(false)
+            .addValidator(StandardValidators.createTimePeriodValidator(1, 
TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
+            .defaultValue("10 seconds")
+            .build();
+
+    public static final PropertyDescriptor ZK_CONNECTION_TIMEOUT = new 
PropertyDescriptor
+            .Builder().name("ZooKeeper Connection Timeout")
+            .description("The amount of time to wait when establishing a 
connection to ZooKeeper, only used with a Solr Type of Cloud.")
+            .required(false)
+            .addValidator(StandardValidators.createTimePeriodValidator(1, 
TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
+            .defaultValue("10 seconds")
+            .build();
+
     private volatile SolrClient solrClient;
 
     @OnScheduled
@@ -78,6 +131,17 @@ public abstract class SolrProcessor extends 
AbstractProcessor {
         this.solrClient = createSolrClient(context);
     }
 
+    @OnStopped
+    public final void closeClient() {
+        if (solrClient != null) {
+            try {
+                solrClient.close();
+            } catch (IOException e) {
+                getLogger().debug("Error closing SolrClient", e);
+            }
+        }
+    }
+
     /**
      * Create a SolrClient based on the type of Solr specified.
      *
@@ -86,13 +150,31 @@ public abstract class SolrProcessor extends 
AbstractProcessor {
      * @return an HttpSolrClient or CloudSolrClient
      */
     protected SolrClient createSolrClient(final ProcessContext context) {
+        final String solrLocation = 
context.getProperty(SOLR_LOCATION).getValue();
+        final Integer socketTimeout = 
context.getProperty(SOLR_SOCKET_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        final Integer connectionTimeout = 
context.getProperty(SOLR_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        final Integer maxConnections = 
context.getProperty(SOLR_MAX_CONNECTIONS).asInteger();
+        final Integer maxConnectionsPerHost = 
context.getProperty(SOLR_MAX_CONNECTIONS_PER_HOST).asInteger();
+
+        final ModifiableSolrParams params = new ModifiableSolrParams();
+        params.set(HttpClientUtil.PROP_SO_TIMEOUT, socketTimeout);
+        params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, connectionTimeout);
+        params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, maxConnections);
+        params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 
maxConnectionsPerHost);
+
+        final HttpClient httpClient = HttpClientUtil.createClient(params);
+
         if 
(SOLR_TYPE_STANDARD.equals(context.getProperty(SOLR_TYPE).getValue())) {
-            return new 
HttpSolrClient(context.getProperty(SOLR_LOCATION).getValue());
+            return new HttpSolrClient(solrLocation, httpClient);
         } else {
-            CloudSolrClient cloudSolrClient = new CloudSolrClient(
-                    context.getProperty(SOLR_LOCATION).getValue());
-            cloudSolrClient.setDefaultCollection(
-                    
context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue());
+            final String collection = 
context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue();
+            final Integer zkClientTimeout = 
context.getProperty(ZK_CLIENT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+            final Integer zkConnectionTimeout = 
context.getProperty(ZK_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+
+            CloudSolrClient cloudSolrClient = new 
CloudSolrClient(solrLocation, httpClient);
+            cloudSolrClient.setDefaultCollection(collection);
+            cloudSolrClient.setZkClientTimeout(zkClientTimeout);
+            cloudSolrClient.setZkConnectTimeout(zkConnectionTimeout);
             return cloudSolrClient;
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1e56de95/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
index 336b287..c309d78 100644
--- 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
+++ 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
@@ -101,7 +101,7 @@ public class TestPutSolrContentStream {
         try (FileInputStream fileIn = new 
FileInputStream(SOLR_JSON_MULTIPLE_DOCS_FILE)) {
             runner.enqueue(fileIn);
 
-            runner.run();
+            runner.run(1, false);
             runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
             
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
             runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1);
@@ -133,7 +133,7 @@ public class TestPutSolrContentStream {
         try (FileInputStream fileIn = new 
FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) {
             runner.enqueue(fileIn);
 
-            runner.run();
+            runner.run(1, false);
             runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
             
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
             runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1);
@@ -159,7 +159,7 @@ public class TestPutSolrContentStream {
         try (FileInputStream fileIn = new 
FileInputStream(CSV_MULTIPLE_DOCS_FILE)) {
             runner.enqueue(fileIn);
 
-            runner.run();
+            runner.run(1, false);
             runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
             
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
             runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1);
@@ -185,7 +185,7 @@ public class TestPutSolrContentStream {
         try (FileInputStream fileIn = new 
FileInputStream(XML_MULTIPLE_DOCS_FILE)) {
             runner.enqueue(fileIn);
 
-            runner.run();
+            runner.run(1, false);
             runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
             
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
             runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1);
@@ -225,7 +225,7 @@ public class TestPutSolrContentStream {
 
         // run the processor with a delete-by-query command
         
runner.enqueue("<delete><query>first:bob</query></delete>".getBytes("UTF-8"));
-        runner.run();
+        runner.run(1, false);
 
         // prove the document got deleted
         qResponse = solrClient.query(query);
@@ -345,6 +345,7 @@ public class TestPutSolrContentStream {
         runner.assertValid();
     }
 
+
     @Test
     public void testSolrTypeStandardShouldNotRequireCollection() {
         final TestRunner runner = 
TestRunners.newTestRunner(PutSolrContentStream.class);

Reply via email to