Adding nifi-solr-bundle with processors adding and retrieving data from Apache 
Solr


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/71b6ffc9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/71b6ffc9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/71b6ffc9

Branch: refs/heads/develop
Commit: 71b6ffc95885cf648c38b0ae2c8283cd849332e4
Parents: 0bd2784
Author: bbende <[email protected]>
Authored: Sun Mar 22 18:59:22 2015 -0400
Committer: bbende <[email protected]>
Committed: Sun Mar 22 18:59:22 2015 -0400

----------------------------------------------------------------------
 nifi/nifi-assembly/pom.xml                      |  39 ++-
 .../nifi-solr-bundle/nifi-solr-nar/pom.xml      |  36 ++
 .../nifi-solr-processors/pom.xml                |  92 +++++
 .../apache/nifi/processors/solr/GetSolr.java    | 310 +++++++++++++++++
 .../processors/solr/PutSolrContentStream.java   | 226 ++++++++++++
 .../nifi/processors/solr/RequestParamsUtil.java |  85 +++++
 .../nifi/processors/solr/SolrProcessor.java     | 156 +++++++++
 .../org.apache.nifi.processor.Processor         |  16 +
 .../solr/EmbeddedSolrServerFactory.java         |  85 +++++
 .../processors/solr/RequestParamsUtilTest.java  |  49 +++
 .../nifi/processors/solr/TestGetSolr.java       | 198 +++++++++++
 .../solr/TestPutSolrContentStream.java          | 344 +++++++++++++++++++
 .../src/test/resources/log4j.properties         |  14 +
 .../src/test/resources/solr/solr.xml            |  18 +
 .../testCollection/conf/_rest_managed.json      |   3 +
 .../testCollection/conf/lang/stopwords_en.txt   |  54 +++
 .../resources/testCollection/conf/protwords.txt |  21 ++
 .../resources/testCollection/conf/schema.xml    |  21 ++
 .../testCollection/conf/solrconfig.xml          |  20 ++
 .../resources/testCollection/conf/synonyms.txt  |  29 ++
 .../resources/testCollection/core.properties    |   1 +
 .../testdata/test-csv-multiple-docs.csv         |   2 +
 .../testdata/test-custom-json-single-doc.json   |  15 +
 .../testdata/test-solr-json-multiple-docs.json  |  18 +
 .../testdata/test-xml-multiple-docs.xml         |  18 +
 nifi/nifi-nar-bundles/nifi-solr-bundle/pom.xml  |  39 +++
 nifi/nifi-nar-bundles/pom.xml                   |   1 +
 nifi/pom.xml                                    |  34 +-
 28 files changed, 1913 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/71b6ffc9/nifi/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-assembly/pom.xml b/nifi/nifi-assembly/pom.xml
index ece7dbb..fb7b87e 100644
--- a/nifi/nifi-assembly/pom.xml
+++ b/nifi/nifi-assembly/pom.xml
@@ -45,7 +45,7 @@
                             <tarLongFileMode>posix</tarLongFileMode>
                         </configuration>
                     </execution>
-                </executions>    
+                </executions>
             </plugin>
         </plugins>
     </build>
@@ -166,9 +166,14 @@
             <artifactId>nifi-kite-nar</artifactId>
             <type>nar</type>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-solr-nar</artifactId>
+            <type>nar</type>
+        </dependency>
     </dependencies>
-    
-    <properties>        
+
+    <properties>
         <!--Wrapper Properties-->
         
<nifi.wrapper.jvm.heap.initial.mb>256</nifi.wrapper.jvm.heap.initial.mb>
         <nifi.wrapper.jvm.heap.max.mb>512</nifi.wrapper.jvm.heap.max.mb>
@@ -176,7 +181,7 @@
         <nifi.max.permgen.size.mb>128</nifi.max.permgen.size.mb>
         <nifi.wrapper.logfile.maxsize>10m</nifi.wrapper.logfile.maxsize>
         <nifi.wrapper.logfile.maxfiles>10</nifi.wrapper.logfile.maxfiles>
-        
+
         <!-- nifi.properties: core properties -->
         <nifi.version>${project.version}</nifi.version>
         
<nifi.flowcontroller.autoResumeState>true</nifi.flowcontroller.autoResumeState>
@@ -204,7 +209,7 @@
         <nifi.swap.in.threads>1</nifi.swap.in.threads>
         <nifi.swap.out.period>5 sec</nifi.swap.out.period>
         <nifi.swap.out.threads>4</nifi.swap.out.threads>
-               
+
         
<nifi.content.repository.implementation>org.apache.nifi.controller.repository.FileSystemRepository</nifi.content.repository.implementation>
         <nifi.content.claim.max.appendable.size>10 
MB</nifi.content.claim.max.appendable.size>
         
<nifi.content.claim.max.flow.files>100</nifi.content.claim.max.flow.files>
@@ -214,21 +219,21 @@
         
<nifi.content.repository.archive.enabled>false</nifi.content.repository.archive.enabled>
         
<nifi.content.repository.always.sync>false</nifi.content.repository.always.sync>
         <nifi.content.viewer.url />
-        
-        
+
+
         <nifi.restore.directory />
         <nifi.ui.banner.text />
         <nifi.ui.autorefresh.interval>30 sec</nifi.ui.autorefresh.interval>
         <nifi.nar.library.directory>./lib</nifi.nar.library.directory>
         <nifi.nar.working.directory>./work/nar/</nifi.nar.working.directory>
         
<nifi.documentation.working.directory>./work/docs/components</nifi.documentation.working.directory>
-        
+
         
<nifi.sensitive.props.algorithm>PBEWITHMD5AND256BITAES-CBC-OPENSSL</nifi.sensitive.props.algorithm>
         <nifi.sensitive.props.provider>BC</nifi.sensitive.props.provider>
         
<nifi.h2.url.append>;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE</nifi.h2.url.append>
 
         <nifi.remote.input.socket.port>9990</nifi.remote.input.socket.port>
-        
+
         <!-- persistent provenance repository properties -->
         
<nifi.provenance.repository.implementation>org.apache.nifi.provenance.PersistentProvenanceRepository</nifi.provenance.repository.implementation>
         
<nifi.provenance.repository.directory.default>./provenance_repository</nifi.provenance.repository.directory.default>
@@ -243,15 +248,15 @@
         <nifi.provenance.repository.index.shard.size>500 
MB</nifi.provenance.repository.index.shard.size>
         
<nifi.provenance.repository.always.sync>false</nifi.provenance.repository.always.sync>
         
<nifi.provenance.repository.journal.count>16</nifi.provenance.repository.journal.count>
-        
+
         <!-- volatile provenance repository properties -->
         
<nifi.provenance.repository.buffer.size>100000</nifi.provenance.repository.buffer.size>
-        
+
         <!-- Component status repository properties -->
         
<nifi.components.status.repository.implementation>org.apache.nifi.controller.status.history.VolatileComponentStatusRepository</nifi.components.status.repository.implementation>
         
<nifi.components.status.repository.buffer.size>288</nifi.components.status.repository.buffer.size>
         <nifi.components.status.snapshot.frequency>5 
mins</nifi.components.status.snapshot.frequency>
-        
+
         <!-- nifi.properties: web properties -->
         <nifi.web.war.directory>./lib</nifi.web.war.directory>
         <nifi.web.http.host />
@@ -260,7 +265,7 @@
         <nifi.web.https.port />
         <nifi.jetty.work.dir>./work/jetty</nifi.jetty.work.dir>
         <nifi.web.jetty.threads>200</nifi.web.jetty.threads>
-        
+
         <!-- nifi.properties: security properties -->
         <nifi.security.keystore />
         <nifi.security.keystoreType />
@@ -277,12 +282,12 @@
         <nifi.security.support.new.account.requests />
         <nifi.security.ocsp.responder.url />
         <nifi.security.ocsp.responder.certificate />
-        
+
         <!-- nifi.properties: cluster common properties (cluster manager and 
nodes must have same values) -->
         <nifi.cluster.protocol.heartbeat.interval>5 
sec</nifi.cluster.protocol.heartbeat.interval>
         
<nifi.cluster.protocol.is.secure>false</nifi.cluster.protocol.is.secure>
         <nifi.cluster.protocol.socket.timeout>30 
sec</nifi.cluster.protocol.socket.timeout>
-        <nifi.cluster.protocol.connection.handshake.timeout>45 
sec</nifi.cluster.protocol.connection.handshake.timeout> 
+        <nifi.cluster.protocol.connection.handshake.timeout>45 
sec</nifi.cluster.protocol.connection.handshake.timeout>
         
<nifi.cluster.protocol.use.multicast>false</nifi.cluster.protocol.use.multicast>
         <nifi.cluster.protocol.multicast.address />
         <nifi.cluster.protocol.multicast.port />
@@ -297,7 +302,7 @@
         
<nifi.cluster.node.protocol.threads>2</nifi.cluster.node.protocol.threads>
         <nifi.cluster.node.unicast.manager.address />
         <nifi.cluster.node.unicast.manager.protocol.port />
-        
+
         <!-- nifi.properties: cluster manager properties (only configure for 
cluster manager) -->
         <nifi.cluster.is.manager>false</nifi.cluster.is.manager>
         <nifi.cluster.manager.address />
@@ -349,7 +354,7 @@
                                 </configuration>
                             </execution>
                         </executions>
-                    </plugin>                    
+                    </plugin>
                     <plugin>
                         <groupId>org.codehaus.mojo</groupId>
                         <artifactId>rpm-maven-plugin</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/71b6ffc9/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-nar/pom.xml 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-nar/pom.xml
new file mode 100644
index 0000000..aeab88c
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-nar/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-solr-bundle</artifactId>
+        <version>0.1.0-incubating-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-solr-nar</artifactId>
+    <packaging>nar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-solr-processors</artifactId>
+            <version>0.1.0-incubating-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/71b6ffc9/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml
new file mode 100644
index 0000000..0e05003
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-solr-bundle</artifactId>
+        <version>0.1.0-incubating-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-solr-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.solr</groupId>
+            <artifactId>solr-solrj</artifactId>
+            <version>${solr.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+        </dependency>
+        <!-- test dependencies -->
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+            <version>1.1.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.solr</groupId>
+            <artifactId>solr-core</artifactId>
+            <version>${solr.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <!-- These Lucene deps should be brought in through solr-core, but 
there
+         appears to be an issue with 5.0.0 that still references some 4.10.3 
poms -->
+        <dependency>
+            <groupId>org.apache.lucene</groupId>
+            <artifactId>lucene-core</artifactId>
+            <version>${solr.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.lucene</groupId>
+            <artifactId>lucene-analyzers-common</artifactId>
+            <version>${solr.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.lucene</groupId>
+            <artifactId>lucene-queryparser</artifactId>
+            <version>${solr.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/71b6ffc9/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
new file mode 100644
index 0000000..d230cc1
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.solr;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.client.solrj.util.ClientUtils;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+
+import java.io.*;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+@Tags({"Apache", "Solr", "Get", "Pull"})
+@CapabilityDescription("Queries Solr and outputs the results as a FlowFile")
+public class GetSolr extends SolrProcessor {
+
+    public static final PropertyDescriptor SOLR_QUERY = new PropertyDescriptor
+            .Builder().name("Solr Query")
+            .description("A query to execute against Solr")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor RETURN_FIELDS = new 
PropertyDescriptor
+            .Builder().name("Return Fields")
+            .description("Comma-separated list of fields names to return")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SORT_CLAUSE = new PropertyDescriptor
+            .Builder().name("Sort Clause")
+            .description("A Solr sort clause, ex: field1 asc, field2 desc")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor DATE_FIELD = new PropertyDescriptor
+            .Builder().name("Date Field")
+            .description("The name of a date field in Solr used to filter 
results")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor
+            .Builder().name("Batch Size")
+            .description("Number of rows per Solr query")
+            .required(true)
+            .addValidator(StandardValidators.INTEGER_VALIDATOR)
+            .defaultValue("100")
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("The results of querying Solr")
+            .build();
+
+    static final String FILE_PREFIX = "conf/.getSolr-";
+    static final String LAST_END_DATE = "LastEndDate";
+    static final String LAST_END_DATE_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+    static final String UNINITIALIZED_LAST_END_DATE_VALUE;
+
+    static {
+        SimpleDateFormat sdf = new SimpleDateFormat(LAST_END_DATE_PATTERN, 
Locale.US);
+        sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
+        UNINITIALIZED_LAST_END_DATE_VALUE = sdf.format(new Date(1L));
+    }
+
+    final AtomicReference<String> lastEndDatedRef = new 
AtomicReference<>(UNINITIALIZED_LAST_END_DATE_VALUE);
+
+    private Set<Relationship> relationships;
+    private List<PropertyDescriptor> descriptors;
+    private final Lock fileLock = new ReentrantLock();
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        super.init(context);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(SOLR_TYPE);
+        descriptors.add(SOLR_LOCATION);
+        descriptors.add(DEFAULT_COLLECTION);
+        descriptors.add(SOLR_QUERY);
+        descriptors.add(RETURN_FIELDS);
+        descriptors.add(SORT_CLAUSE);
+        descriptors.add(DATE_FIELD);
+        descriptors.add(BATCH_SIZE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return this.descriptors;
+    }
+
+    @Override
+    public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
+        lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE);
+    }
+
+    @OnShutdown
+    public void onShutdown() {
+        writeLastEndDate();
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        final ProcessorLog logger = getLogger();
+
+        final FlowFile incomingFlowFile = session.get();
+        if (incomingFlowFile != null) {
+            session.transfer(incomingFlowFile, REL_SUCCESS);
+            logger.warn("found FlowFile {} in input queue; transferring to 
success",
+                    new Object[]{incomingFlowFile});
+        }
+
+        readLastEndDate();
+
+        final SimpleDateFormat sdf = new 
SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US);
+        sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
+        final String currDate = sdf.format(new Date());
+
+        final boolean initialized = 
!UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get());
+
+        final String query = context.getProperty(SOLR_QUERY).getValue();
+        final SolrQuery solrQuery = new SolrQuery(query);
+        solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger());
+
+        // if initialized then apply a filter to restrict results from the 
last end time til now
+        if (initialized) {
+            StringBuilder filterQuery = new StringBuilder();
+            filterQuery.append(context.getProperty(DATE_FIELD).getValue())
+                    .append(":{").append(lastEndDatedRef.get()).append(" TO ")
+                    .append(currDate).append("]");
+            solrQuery.addFilterQuery(filterQuery.toString());
+            logger.info("Applying filter query {}", new 
Object[]{filterQuery.toString()});
+        }
+
+        final String returnFields = 
context.getProperty(RETURN_FIELDS).getValue();
+        if (returnFields != null && !returnFields.trim().isEmpty()) {
+            for (String returnField : returnFields.trim().split("[,]")) {
+                solrQuery.addField(returnField.trim());
+            }
+        }
+
+        final String fullSortClause = 
context.getProperty(SORT_CLAUSE).getValue();
+        if (fullSortClause != null && !fullSortClause.trim().isEmpty()) {
+            for (String sortClause : fullSortClause.split("[,]")) {
+                String[] sortParts = sortClause.trim().split("[ ]");
+                solrQuery.addSort(sortParts[0], 
SolrQuery.ORDER.valueOf(sortParts[1]));
+            }
+        }
+
+        try {
+            // run the initial query and send out the first page of results
+            final StopWatch stopWatch = new StopWatch(true);
+            QueryResponse response = getSolrServer().query(solrQuery);
+            stopWatch.stop();
+
+            long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+
+            final SolrDocumentList documentList = response.getResults();
+            logger.info("Retrieved {} results from Solr for {} in {} ms",
+                    new Object[] {documentList.getNumFound(), query, 
duration});
+
+            if (documentList != null && documentList.getNumFound() > 0) {
+                FlowFile flowFile = session.create();
+                flowFile = session.write(flowFile, new 
QueryResponseOutputStreamCallback(response));
+                session.transfer(flowFile, REL_SUCCESS);
+
+                StringBuilder transitUri = new StringBuilder("solr://");
+                
transitUri.append(context.getProperty(SOLR_LOCATION).getValue());
+                if 
(SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) {
+                    
transitUri.append("/").append(context.getProperty(DEFAULT_COLLECTION).getValue());
+                }
+
+                session.getProvenanceReporter().receive(flowFile, 
transitUri.toString(), duration);
+
+                // if initialized then page through the results and send out 
each page
+                if (initialized) {
+                    int endRow = response.getResults().size();
+                    long totalResults = response.getResults().getNumFound();
+
+                    while (endRow < totalResults) {
+                        solrQuery.setStart(endRow);
+
+                        stopWatch.start();
+                        response = getSolrServer().query(solrQuery);
+                        stopWatch.stop();
+
+                        duration = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
+                        logger.info("Retrieved results for {} in {} ms", new 
Object[]{query, duration});
+
+                        flowFile = session.create();
+                        flowFile = session.write(flowFile, new 
QueryResponseOutputStreamCallback(response));
+                        session.transfer(flowFile, REL_SUCCESS);
+                        session.getProvenanceReporter().receive(flowFile, 
transitUri.toString(), duration);
+                        endRow += response.getResults().size();
+                    }
+                }
+            }
+
+            lastEndDatedRef.set(currDate);
+            writeLastEndDate();
+        } catch (SolrServerException e) {
+            context.yield();
+            session.rollback();
+            logger.error("Failed to execute query {} due to {}", new 
Object[]{query, e}, e);
+            throw new ProcessException(e);
+        } catch (final Throwable t) {
+            context.yield();
+            session.rollback();
+            logger.error("Failed to execute query {} due to {}", new 
Object[]{query, t}, t);
+            throw t;
+        }
+    }
+
+    private void readLastEndDate() {
+        fileLock.lock();
+        File lastEndDateCache = new File(FILE_PREFIX + getIdentifier());
+        try (FileInputStream fis = new FileInputStream(lastEndDateCache)) {
+            Properties props = new Properties();
+            props.load(fis);
+            lastEndDatedRef.set(props.getProperty(LAST_END_DATE));
+        } catch (IOException swallow) {
+        } finally {
+            fileLock.unlock();
+        }
+    }
+
+    private void writeLastEndDate() {
+        fileLock.lock();
+        File lastEndDateCache = new File(FILE_PREFIX + getIdentifier());
+        try (FileOutputStream fos = new FileOutputStream(lastEndDateCache)) {
+            Properties props = new Properties();
+            props.setProperty(LAST_END_DATE, lastEndDatedRef.get());
+            props.store(fos, "GetSolr LastEndDate value");
+        } catch (IOException e) {
+            getLogger().error("Failed to persist LastEndDate due to " + e, e);
+        } finally {
+            fileLock.unlock();
+        }
+    }
+
+    /**
+     * Writes each SolrDocument in XML format to the OutputStream.
+     */
+    private class QueryResponseOutputStreamCallback implements 
OutputStreamCallback {
+        private QueryResponse response;
+
+        public QueryResponseOutputStreamCallback(QueryResponse response) {
+            this.response = response;
+        }
+
+        @Override
+        public void process(OutputStream out) throws IOException {
+            for (SolrDocument doc : response.getResults()) {
+                String xml = 
ClientUtils.toXML(ClientUtils.toSolrInputDocument(doc));
+                IOUtils.write(xml, out);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/71b6ffc9/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
new file mode 100644
index 0000000..704d8a2
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.solr;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.ObjectHolder;
+import org.apache.nifi.util.StopWatch;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.MultiMapSolrParams;
+import org.apache.solr.common.util.ContentStreamBase;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@Tags({"Apache", "Solr", "Put", "Send"})
+@CapabilityDescription("Sends the contents of a FlowFile as a ContentStream to 
Solr")
+public class PutSolrContentStream extends SolrProcessor {
+
+    public static final PropertyDescriptor CONTENT_STREAM_URL = new 
PropertyDescriptor
+            .Builder().name("Content Stream URL")
+            .description("The URL in Solr to post the ContentStream")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("/update/json/docs")
+            .build();
+
+    public static final PropertyDescriptor CONTENT_TYPE = new 
PropertyDescriptor
+            .Builder().name("Content-Type")
+            .description("Content-Type being sent to Solr")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("application/json")
+            .build();
+
+    public static final PropertyDescriptor REQUEST_PARAMS = new 
PropertyDescriptor
+            .Builder().name("Request Parameters")
+            .description("Additional parameters to pass to Solr on each 
request, i.e. key1=val1&key2=val2")
+            .required(false)
+            .addValidator(RequestParamsUtil.getValidator())
+            .defaultValue("json.command=false&split=/&f=id:/field1")
+            .build();
+
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("The original FlowFile")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that failed for any reason other than Solr 
being unreachable")
+            .build();
+
+    public static final Relationship REL_CONNECTION_FAILURE = new 
Relationship.Builder()
+            .name("connection_failure")
+            .description("FlowFiles that failed because Solr is unreachable")
+            .build();
+
+    /**
+     * The name of a FlowFile attribute used for specifying a Solr collection.
+     */
+    public static final String SOLR_COLLECTION_ATTR = "solr.collection";
+
+    private Set<Relationship> relationships;
+    private List<PropertyDescriptor> descriptors;
+    private volatile MultiMapSolrParams requestParams;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        super.init(context);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(SOLR_TYPE);
+        descriptors.add(SOLR_LOCATION);
+        descriptors.add(DEFAULT_COLLECTION);
+        descriptors.add(CONTENT_STREAM_URL);
+        descriptors.add(CONTENT_TYPE);
+        descriptors.add(REQUEST_PARAMS);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_ORIGINAL);
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_CONNECTION_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return this.descriptors;
+    }
+
+    @Override
+    protected void additionalOnScheduled(ProcessContext context) {
+        final String requestParamsVal = 
context.getProperty(REQUEST_PARAMS).getValue();
+        this.requestParams = RequestParamsUtil.parse(requestParamsVal);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if ( flowFile == null ) {
+            return;
+        }
+
+        final ObjectHolder<SolrException> error = new ObjectHolder<>(null);
+        final ObjectHolder<SolrServerException> connectionError = new 
ObjectHolder<>(null);
+        final ObjectHolder<String> collectionUsed = new ObjectHolder<>(null);
+
+        final String collectionAttrVal = 
flowFile.getAttribute(SOLR_COLLECTION_ATTR);
+        final boolean isSolrCloud = 
SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue());
+
+        StopWatch timer = new StopWatch(true);
+        session.read(flowFile, new InputStreamCallback() {
+            @Override
+            public void process(final InputStream in) throws IOException {
+                ContentStreamUpdateRequest request = new 
ContentStreamUpdateRequest(
+                        context.getProperty(CONTENT_STREAM_URL).getValue());
+                request.setParams(new ModifiableSolrParams());
+
+                // add the extra params, don't use 'set' in case of repeating 
params
+                Iterator<String> paramNames = 
requestParams.getParameterNamesIterator();
+                while (paramNames.hasNext()) {
+                    String paramName = paramNames.next();
+                    for (String paramValue : 
requestParams.getParams(paramName)) {
+                        request.getParams().add(paramName, paramValue);
+                    }
+                }
+
+                // send the request to the specified collection, or to the 
default collection
+                if (isSolrCloud) {
+                    String collection = collectionAttrVal;
+                    if (StringUtils.isBlank(collection)) {
+                        collection = 
context.getProperty(DEFAULT_COLLECTION).getValue();
+                    }
+                    request.setParam("collection", collection);
+                    collectionUsed.set(collection);
+                }
+
+                try (final BufferedInputStream bufferedIn = new 
BufferedInputStream(in)) {
+                    // add the FlowFile's content on the UpdateRequest
+                    request.addContentStream(new ContentStreamBase() {
+                        @Override
+                        public InputStream getStream() throws IOException {
+                            return bufferedIn;
+                        }
+
+                        @Override
+                        public String getContentType() {
+                            return 
context.getProperty(CONTENT_TYPE).getValue();
+                        }
+                    });
+
+                    UpdateResponse response = request.process(getSolrServer());
+                    getLogger().debug("Got {} response from Solr", new 
Object[]{response.getStatus()});
+                } catch (SolrException e) {
+                    error.set(e);
+                } catch (SolrServerException e) {
+                    connectionError.set(e);
+                }
+            }
+        });
+        timer.stop();
+
+        if (error.get() != null) {
+            getLogger().error("Failed to send {} to Solr due to {} with status 
code {}; routing to failure",
+                    new Object[]{flowFile, error.get(), error.get().code()});
+            session.transfer(flowFile, REL_FAILURE);
+        } else if (connectionError.get() != null) {
+            getLogger().error("Failed to send {} to Solr due to {}; routing to 
connection_failure",
+                    new Object[]{flowFile, connectionError.get()});
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_CONNECTION_FAILURE);
+        } else {
+            StringBuilder transitUri = new StringBuilder("solr://");
+            transitUri.append(context.getProperty(SOLR_LOCATION).getValue());
+            if (isSolrCloud) {
+                transitUri.append(":").append(collectionUsed.get());
+            }
+
+            final long duration = timer.getDuration(TimeUnit.MILLISECONDS);
+            session.getProvenanceReporter().send(flowFile, 
transitUri.toString(), duration, true);
+            getLogger().info("Successfully sent {} to Solr in {} millis", new 
Object[]{flowFile, duration});
+            session.transfer(flowFile, REL_ORIGINAL);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/71b6ffc9/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/RequestParamsUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/RequestParamsUtil.java
 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/RequestParamsUtil.java
new file mode 100644
index 0000000..647f04e
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/RequestParamsUtil.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.solr;
+
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.solr.common.params.MultiMapSolrParams;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class RequestParamsUtil {
+
+    /**
+     * Parses a String of request params into a MultiMap.
+     *
+     * @param requestParams
+     *          the value of the request params property
+     * @return
+     */
+    public static MultiMapSolrParams parse(final String requestParams) {
+        final Map<String,String[]> paramsMap = new HashMap<>();
+        if (requestParams == null || requestParams.trim().isEmpty()) {
+            return new MultiMapSolrParams(paramsMap);
+        }
+
+        final String[] params = requestParams.split("[&]");
+        if (params == null || params.length == 0) {
+            throw new IllegalStateException(
+                    "Parameters must be in form k1=v1&k2=v2, was" + 
requestParams);
+        }
+
+        for (final String param : params) {
+            final String[] keyVal = param.split("=");
+            if (keyVal.length != 2) {
+                throw new IllegalStateException(
+                        "Parameter must be in form key=value, was " + param);
+            }
+
+            final String key = keyVal[0].trim();
+            final String val = keyVal[1].trim();
+            MultiMapSolrParams.addParam(key, val, paramsMap);
+        }
+
+        return new MultiMapSolrParams(paramsMap);
+    }
+
+    /**
+     * Creates a property validator for a request params string.
+     *
+     * @return valid if the input parses successfully, invalid otherwise
+     */
+    public static Validator getValidator() {
+        return new Validator() {
+            @Override
+            public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+                try {
+                    RequestParamsUtil.parse(input);
+                    return new 
ValidationResult.Builder().subject(subject).input(input)
+                            .explanation("Valid Params").valid(true).build();
+                } catch (final Exception e) {
+                    return new 
ValidationResult.Builder().subject(subject).input(input)
+                            .explanation("Invalid Params" + 
e.getMessage()).valid(false).build();
+                }
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/71b6ffc9/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
new file mode 100644
index 0000000..f286a1a
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.solr;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * A base class for processors that interact with Apache Solr.
+ *
+ */
+public abstract class SolrProcessor extends AbstractProcessor {
+
+    public static final AllowableValue SOLR_TYPE_CLOUD = new AllowableValue(
+            "Cloud", "Cloud", "A SolrCloud instance.");
+
+    public static final AllowableValue SOLR_TYPE_STANDARD = new AllowableValue(
+            "Standard", "Standard", "A stand-alone Solr instance.");
+
+    public static final PropertyDescriptor SOLR_TYPE = new PropertyDescriptor
+            .Builder().name("Solr Type")
+            .description("The type of Solr instance, Cloud or Standard.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(SOLR_TYPE_CLOUD, SOLR_TYPE_STANDARD)
+            .defaultValue(SOLR_TYPE_STANDARD.getValue())
+            .build();
+
+    public static final PropertyDescriptor SOLR_LOCATION = new 
PropertyDescriptor
+            .Builder().name("Solr Location")
+            .description("The Solr url for a Solr Type of Standard, " +
+                    "or the ZooKeeper hosts for a Solr Type of Cloud.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor DEFAULT_COLLECTION = new 
PropertyDescriptor
+            .Builder().name("Default Collection")
+            .description("The Solr collection name, only used with a Solr Type 
of Cloud")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    private volatile SolrClient solrServer;
+
+    @OnScheduled
+    public final void onScheduled(final ProcessContext context) throws 
IOException {
+        this.solrServer = createSolrServer(context);
+        additionalOnScheduled(context);
+    }
+
+    /**
+     * Create a SolrServer based on the type of Solr specified.
+     *
+     * @param context
+     *          The context
+     * @return an HttpSolrServer or CloudSolrServer
+     */
+    protected SolrClient createSolrServer(final ProcessContext context) {
+        if 
(SOLR_TYPE_STANDARD.equals(context.getProperty(SOLR_TYPE).getValue())) {
+            return new 
HttpSolrClient(context.getProperty(SOLR_LOCATION).getValue());
+        } else {
+            CloudSolrClient cloudSolrServer = new CloudSolrClient(
+                    context.getProperty(SOLR_LOCATION).getValue());
+            cloudSolrServer.setDefaultCollection(
+                    context.getProperty(DEFAULT_COLLECTION).getValue());
+            return cloudSolrServer;
+        }
+    }
+
+    /**
+     * Returns the {@link org.apache.solr.client.solrj.SolrClient} that was 
created by the
+     * {@link #createSolrServer(org.apache.nifi.processor.ProcessContext)} 
method
+     *
+     * @return
+     */
+    protected final SolrClient getSolrServer() {
+        return solrServer;
+    }
+
+    /**
+     * Allows additional action to be taken during scheduling of processor.
+     *
+     * @param context
+     *          The context
+     */
+    protected void additionalOnScheduled(final ProcessContext context) {
+
+    }
+
+    @Override
+    protected final Collection<ValidationResult> 
customValidate(ValidationContext context) {
+        final List<ValidationResult> problems = new ArrayList<>();
+
+        if (SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) 
{
+            final String collection = 
context.getProperty(DEFAULT_COLLECTION).getValue();
+            if (collection == null || collection.trim().isEmpty()) {
+                problems.add(new ValidationResult.Builder()
+                        .subject(DEFAULT_COLLECTION.getName())
+                        .input(collection).valid(false)
+                        .explanation("A collection must specified for Solr 
Type of Cloud")
+                        .build());
+            }
+        }
+
+        Collection<ValidationResult> otherProblems = 
this.additionalCustomValidation(context);
+        if (otherProblems != null) {
+            problems.addAll(otherProblems);
+        }
+
+        return problems;
+    }
+
+    /**
+     * Allows additional custom validation to be done. This will be called from
+     * the parent's customValidation method.
+     *
+     * @param context
+     *            The context
+     * @return Validation results indicating problems
+     */
+    protected Collection<ValidationResult> 
additionalCustomValidation(ValidationContext context) {
+        return new ArrayList<>();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/71b6ffc9/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..657d0e8
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.solr.PutSolrContentStream
+org.apache.nifi.processors.solr.GetSolr

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/71b6ffc9/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/EmbeddedSolrServerFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/EmbeddedSolrServerFactory.java
 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/EmbeddedSolrServerFactory.java
new file mode 100644
index 0000000..555c875
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/EmbeddedSolrServerFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.solr;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Helper to create EmbeddedSolrServer instances for testing.
+ */
+public class EmbeddedSolrServerFactory {
+
+    public static final String DEFAULT_SOLR_HOME = "src/test/resources/solr";
+    public static final String DEFAULT_CORE_HOME = "src/test/resources/";
+    public static final String DEFAULT_DATA_DIR = "target";
+
+    /**
+     * Use the defaults to create the core.
+     *
+     * @param coreName
+     * @return
+     */
+    public static SolrClient create(String coreName) throws IOException {
+        return create(DEFAULT_SOLR_HOME, DEFAULT_CORE_HOME,
+                coreName, DEFAULT_DATA_DIR);
+    }
+
+    /**
+     *
+     * @param solrHome
+     *              path to directory where solr.xml lives
+     * @param coreName
+     *              the name of the core to load
+     * @param dataDir
+     *              the data dir for the core
+     *
+     * @return an EmbeddedSolrServer for the given core
+     */
+    public static SolrClient create(String solrHome, String coreHome, String 
coreName, String dataDir)
+            throws IOException {
+
+        Properties props = new Properties();
+        if (dataDir != null) {
+            File coreDataDir = new File(dataDir + "/" + coreName);
+            if (coreDataDir.exists()) {
+                FileUtils.deleteDirectory(coreDataDir);
+            }
+            props.setProperty("dataDir", dataDir + "/" + coreName);
+        }
+
+        CoreContainer coreContainer = new CoreContainer(solrHome);
+        coreContainer.load();
+
+        CoreDescriptor descriptor = new CoreDescriptor(coreContainer, coreName,
+                new File(coreHome, coreName).getAbsolutePath(), props);
+
+        coreContainer.create(descriptor);
+        return new EmbeddedSolrServer(coreContainer, coreName);
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/71b6ffc9/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/RequestParamsUtilTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/RequestParamsUtilTest.java
 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/RequestParamsUtilTest.java
new file mode 100644
index 0000000..5a1373e
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/RequestParamsUtilTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.solr;
+
+import org.junit.Assert;
+import org.apache.solr.common.params.MultiMapSolrParams;
+import org.junit.Test;
+
+
+public class RequestParamsUtilTest {
+
+    @Test
+    public void testSimpleParse() {
+        MultiMapSolrParams map = RequestParamsUtil.parse("a=1&b=2&c=3");
+        Assert.assertEquals("1", map.get("a"));
+        Assert.assertEquals("2", map.get("b"));
+        Assert.assertEquals("3", map.get("c"));
+    }
+
+    @Test
+    public void testParseWithSpaces() {
+        MultiMapSolrParams map = RequestParamsUtil.parse("a = 1 &b= 2& c= 3 ");
+        Assert.assertEquals("1", map.get("a"));
+        Assert.assertEquals("2", map.get("b"));
+        Assert.assertEquals("3", map.get("c"));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testMalformedParamsParse() {
+        RequestParamsUtil.parse("a=1&b&c=3");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/71b6ffc9/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java
 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java
new file mode 100644
index 0000000..52eb06b
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.solr;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.common.SolrInputDocument;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestGetSolr {
+
+    static final String DEFAULT_SOLR_CORE = "testCollection";
+
+    private SolrClient solrClient;
+
+    @Before
+    public void setup() {
+        // create the conf dir if it doesn't exist
+        File confDir = new File("conf");
+        if (!confDir.exists()) {
+            confDir.mkdir();
+        }
+
+        try {
+            // create an EmbeddedSolrServer for the processor to use
+            String relPath = getClass().getProtectionDomain().getCodeSource()
+                    .getLocation().getFile() + "../../target";
+
+            solrClient = 
EmbeddedSolrServerFactory.create(EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME,
+                    EmbeddedSolrServerFactory.DEFAULT_CORE_HOME, 
DEFAULT_SOLR_CORE, relPath);
+
+            // create some test documents
+            SolrInputDocument doc1 = new SolrInputDocument();
+            doc1.addField("first", "bob");
+            doc1.addField("last", "smith");
+            doc1.addField("created", new Date());
+
+            SolrInputDocument doc2 = new SolrInputDocument();
+            doc2.addField("first", "alice");
+            doc2.addField("last", "smith");
+            doc2.addField("created", new Date());
+
+            SolrInputDocument doc3 = new SolrInputDocument();
+            doc3.addField("first", "mike");
+            doc3.addField("last", "smith");
+            doc3.addField("created", new Date());
+
+            SolrInputDocument doc4 = new SolrInputDocument();
+            doc4.addField("first", "john");
+            doc4.addField("last", "smith");
+            doc4.addField("created", new Date());
+
+            SolrInputDocument doc5 = new SolrInputDocument();
+            doc5.addField("first", "joan");
+            doc5.addField("last", "smith");
+            doc5.addField("created", new Date());
+
+            // add the test data to the index
+            solrClient.add(doc1);
+            solrClient.add(doc2);
+            solrClient.add(doc3);
+            solrClient.add(doc4);
+            solrClient.add(doc5);
+            solrClient.commit();
+        } catch (Exception e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @After
+    public void teardown() {
+        File confDir = new File("conf");
+        assertTrue(confDir.exists());
+        File[] files = confDir.listFiles();
+        assertTrue(files.length > 0);
+        for (File file : files) {
+            assertTrue("Failed to delete " + file.getName(), file.delete());
+        }
+        assertTrue(confDir.delete());
+
+        try {
+            solrClient.shutdown();
+        } catch (Exception e) {
+        }
+    }
+
+    @Test
+    public void testMoreThanBatchSizeShouldProduceMultipleFlowFiles() throws 
IOException, SolrServerException {
+        final TestableProcessor proc = new TestableProcessor(solrClient);
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+
+        // setup a lastEndDate file to simulate picking up from a previous end 
date
+        SimpleDateFormat sdf = new 
SimpleDateFormat(GetSolr.LAST_END_DATE_PATTERN, Locale.US);
+        sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+        Calendar cal = new GregorianCalendar();
+        cal.add(Calendar.MINUTE, -30);
+        final String lastEndDate = sdf.format(cal.getTime());
+
+        File lastEndDateCache = new File(GetSolr.FILE_PREFIX + 
proc.getIdentifier());
+        try (FileOutputStream fos = new FileOutputStream(lastEndDateCache)) {
+            Properties props = new Properties();
+            props.setProperty(GetSolr.LAST_END_DATE, lastEndDate);
+            props.store(fos, "GetSolr LastEndDate value");
+        } catch (IOException e) {
+            Assert.fail("Failed to setup last end date value: " + 
e.getMessage());
+        }
+
+        runner.setProperty(GetSolr.SOLR_TYPE, 
PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
+        runner.setProperty(GetSolr.SOLR_LOCATION, 
"http://localhost:8443/solr";);
+        runner.setProperty(GetSolr.SOLR_QUERY, "last:smith");
+        runner.setProperty(GetSolr.RETURN_FIELDS, "first, last, created");
+        runner.setProperty(GetSolr.SORT_CLAUSE, "created desc, first asc");
+        runner.setProperty(GetSolr.DATE_FIELD, "created");
+        runner.setProperty(GetSolr.BATCH_SIZE, "2");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 3);
+    }
+
+    @Test
+    public void testLessThanBatchSizeShouldProduceOneFlowFile() throws 
IOException, SolrServerException {
+        final TestableProcessor proc = new TestableProcessor(solrClient);
+
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(GetSolr.SOLR_TYPE, 
PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
+        runner.setProperty(GetSolr.SOLR_LOCATION, 
"http://localhost:8443/solr";);
+        runner.setProperty(GetSolr.SOLR_QUERY, "last:smith");
+        runner.setProperty(GetSolr.RETURN_FIELDS, "created");
+        runner.setProperty(GetSolr.SORT_CLAUSE, "created desc");
+        runner.setProperty(GetSolr.DATE_FIELD, "created");
+        runner.setProperty(GetSolr.BATCH_SIZE, "10");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testNoResultsShouldProduceNoOutput() throws IOException, 
SolrServerException {
+        final TestableProcessor proc = new TestableProcessor(solrClient);
+
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(GetSolr.SOLR_TYPE, 
PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
+        runner.setProperty(GetSolr.SOLR_LOCATION, 
"http://localhost:8443/solr";);
+        runner.setProperty(GetSolr.SOLR_QUERY, "last:xyz");
+        runner.setProperty(GetSolr.RETURN_FIELDS, "created");
+        runner.setProperty(GetSolr.SORT_CLAUSE, "created desc");
+        runner.setProperty(GetSolr.DATE_FIELD, "created");
+        runner.setProperty(GetSolr.BATCH_SIZE, "10");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 0);
+    }
+
+
+    // Override createSolrServer and return the passed in SolrClient
+    private class TestableProcessor extends GetSolr {
+        private SolrClient solrClient;
+
+        public TestableProcessor(SolrClient solrClient) {
+            this.solrClient = solrClient;
+        }
+        @Override
+        protected SolrClient createSolrServer(ProcessContext context) {
+            return solrClient;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/71b6ffc9/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
new file mode 100644
index 0000000..8800978
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.solr;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.solr.client.solrj.*;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * Test for PutSolr processor.
+ */
+public class TestPutSolrContentStream {
+
+    static final String DEFAULT_SOLR_CORE = "testCollection";
+
+    static final String CUSTOM_JSON_SINGLE_DOC_FILE = 
"src/test/resources/testdata/test-custom-json-single-doc.json";
+    static final String SOLR_JSON_MULTIPLE_DOCS_FILE = 
"src/test/resources/testdata/test-solr-json-multiple-docs.json";
+    static final String CSV_MULTIPLE_DOCS_FILE = 
"src/test/resources/testdata/test-csv-multiple-docs.csv";
+    static final String XML_MULTIPLE_DOCS_FILE = 
"src/test/resources/testdata/test-xml-multiple-docs.xml";
+
+    static final SolrDocument expectedDoc1 = new SolrDocument();
+    static {
+        expectedDoc1.addField("first", "John");
+        expectedDoc1.addField("last", "Doe");
+        expectedDoc1.addField("grade", 8);
+        expectedDoc1.addField("subject", "Math");
+        expectedDoc1.addField("test", "term1");
+        expectedDoc1.addField("marks", 90);
+    }
+
+    static final SolrDocument expectedDoc2 = new SolrDocument();
+    static {
+        expectedDoc2.addField("first", "John");
+        expectedDoc2.addField("last", "Doe");
+        expectedDoc2.addField("grade", 8);
+        expectedDoc2.addField("subject", "Biology");
+        expectedDoc2.addField("test", "term1");
+        expectedDoc2.addField("marks", 86);
+    }
+
+    /**
+     * Creates a base TestRunner with Solr Type of standard and json update 
path.
+     */
+    private static TestRunner createDefaultTestRunner(PutSolrContentStream 
processor) {
+        TestRunner runner = TestRunners.newTestRunner(processor);
+        runner.setProperty(PutSolrContentStream.SOLR_TYPE, 
PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
+        runner.setProperty(PutSolrContentStream.SOLR_LOCATION, 
"http://localhost:8443/solr";);
+        return runner;
+    }
+
+    @Test
+    public void testUpdateWithSolrJson() throws IOException, 
SolrServerException {
+        final EmbeddedSolrServerProcessor proc = new 
EmbeddedSolrServerProcessor(DEFAULT_SOLR_CORE);
+
+        final TestRunner runner = createDefaultTestRunner(proc);
+        runner.setProperty(PutSolrContentStream.CONTENT_STREAM_URL, 
"/update/json/docs");
+        
runner.setProperty(PutSolrContentStream.REQUEST_PARAMS,"json.command=false");
+
+        try (FileInputStream fileIn = new 
FileInputStream(SOLR_JSON_MULTIPLE_DOCS_FILE)) {
+            runner.enqueue(fileIn);
+
+            runner.run();
+            runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
+            
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
+            runner.assertTransferCount(PutSolrContentStream.REL_ORIGINAL, 1);
+
+            verifySolrDocuments(proc.getSolrServer(), 
Arrays.asList(expectedDoc1, expectedDoc2));
+        } finally {
+            try { proc.getSolrServer().shutdown(); } catch (Exception e) { }
+        }
+    }
+
+    @Test
+    public void testUpdateWithCustomJson() throws IOException, 
SolrServerException {
+        final EmbeddedSolrServerProcessor proc = new 
EmbeddedSolrServerProcessor(DEFAULT_SOLR_CORE);
+
+        final TestRunner runner = createDefaultTestRunner(proc);
+        runner.setProperty(PutSolrContentStream.CONTENT_STREAM_URL, 
"/update/json/docs");
+        runner.setProperty(PutSolrContentStream.REQUEST_PARAMS,
+                "split=/exams" +
+                "&f=first:/first" +
+                "&f=last:/last" +
+                "&f=grade:/grade" +
+                "&f=subject:/exams/subject" +
+                "&f=test:/exams/test" +
+                "&f=marks:/exams/marks");
+
+        try (FileInputStream fileIn = new 
FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) {
+            runner.enqueue(fileIn);
+
+            runner.run();
+            runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
+            
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
+            runner.assertTransferCount(PutSolrContentStream.REL_ORIGINAL, 1);
+
+            verifySolrDocuments(proc.getSolrServer(), 
Arrays.asList(expectedDoc1, expectedDoc2));
+        } finally {
+            try { proc.getSolrServer().shutdown(); } catch (Exception e) { }
+        }
+    }
+
+    @Test
+    public void testUpdateWithCsv() throws IOException, SolrServerException {
+        final EmbeddedSolrServerProcessor proc = new 
EmbeddedSolrServerProcessor(DEFAULT_SOLR_CORE);
+
+        final TestRunner runner = createDefaultTestRunner(proc);
+        runner.setProperty(PutSolrContentStream.CONTENT_STREAM_URL, 
"/update/csv");
+        runner.setProperty(PutSolrContentStream.REQUEST_PARAMS,
+                "fieldnames=first,last,grade,subject,test,marks");
+
+        try (FileInputStream fileIn = new 
FileInputStream(CSV_MULTIPLE_DOCS_FILE)) {
+            runner.enqueue(fileIn);
+
+            runner.run();
+            runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
+            
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
+            runner.assertTransferCount(PutSolrContentStream.REL_ORIGINAL, 1);
+
+            verifySolrDocuments(proc.getSolrServer(), 
Arrays.asList(expectedDoc1, expectedDoc2));
+        } finally {
+            try { proc.getSolrServer().shutdown(); } catch (Exception e) { }
+        }
+    }
+
+    @Test
+    public void testUpdateWithXml() throws IOException, SolrServerException {
+        final EmbeddedSolrServerProcessor proc = new 
EmbeddedSolrServerProcessor(DEFAULT_SOLR_CORE);
+
+        final TestRunner runner = createDefaultTestRunner(proc);
+        runner.setProperty(PutSolrContentStream.CONTENT_STREAM_URL, "/update");
+        runner.setProperty(PutSolrContentStream.CONTENT_TYPE, 
"application/xml");
+
+        try (FileInputStream fileIn = new 
FileInputStream(XML_MULTIPLE_DOCS_FILE)) {
+            runner.enqueue(fileIn);
+
+            runner.run();
+            runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
+            
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
+            runner.assertTransferCount(PutSolrContentStream.REL_ORIGINAL, 1);
+
+            verifySolrDocuments(proc.getSolrServer(), 
Arrays.asList(expectedDoc1, expectedDoc2));
+        } finally {
+            try { proc.getSolrServer().shutdown(); } catch (Exception e) { }
+        }
+    }
+
+    @Test
+    public void testSolrServerExceptionShouldRouteToConnectionFailure() throws 
IOException, SolrServerException {
+        final Throwable throwable = new SolrServerException("Error 
communicating with Solr");
+        final ExceptionThrowingProcessor proc = new 
ExceptionThrowingProcessor(throwable);
+
+        final TestRunner runner = createDefaultTestRunner(proc);
+
+        try (FileInputStream fileIn = new 
FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) {
+            runner.enqueue(fileIn);
+            runner.run();
+
+            
runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_CONNECTION_FAILURE,
 1);
+            verify(proc.getSolrServer(), 
times(1)).request(any(SolrRequest.class));
+        }
+    }
+
+    @Test
+    public void testSolrExceptionShouldRouteToFailure() throws IOException, 
SolrServerException {
+        final Throwable throwable = new 
SolrException(SolrException.ErrorCode.BAD_REQUEST, "Error");
+        final ExceptionThrowingProcessor proc = new 
ExceptionThrowingProcessor(throwable);
+
+        final TestRunner runner = createDefaultTestRunner(proc);
+
+        try (FileInputStream fileIn = new 
FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) {
+            runner.enqueue(fileIn);
+            runner.run();
+
+            
runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_FAILURE, 1);
+            verify(proc.getSolrServer(), 
times(1)).request(any(SolrRequest.class));
+        }
+    }
+
+    @Test
+    public void testRemoteSolrExceptionShouldRouteToFailure() throws 
IOException, SolrServerException {
+        final Throwable throwable = new HttpSolrClient.RemoteSolrException(
+                "host", 401, "error", new NumberFormatException());
+        final ExceptionThrowingProcessor proc = new 
ExceptionThrowingProcessor(throwable);
+
+        final TestRunner runner = createDefaultTestRunner(proc);
+
+        try (FileInputStream fileIn = new 
FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) {
+            runner.enqueue(fileIn);
+            runner.run();
+
+            
runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_FAILURE, 1);
+            verify(proc.getSolrServer(), 
times(1)).request(any(SolrRequest.class));
+        }
+    }
+
+
+    @Test
+    public void testSolrTypeCloudShouldRequireCollection() {
+        final TestRunner runner = 
TestRunners.newTestRunner(PutSolrContentStream.class);
+        runner.setProperty(PutSolrContentStream.SOLR_TYPE, 
PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
+        runner.setProperty(PutSolrContentStream.SOLR_LOCATION, 
"http://localhost:8443/solr";);
+        runner.assertNotValid();
+
+        runner.setProperty(PutSolrContentStream.DEFAULT_COLLECTION, 
"someCollection1");
+        runner.assertValid();
+    }
+
+    @Test
+    public void testSolrTypeStandardShouldNotRequireCollection() {
+        final TestRunner runner = 
TestRunners.newTestRunner(PutSolrContentStream.class);
+        runner.setProperty(PutSolrContentStream.SOLR_TYPE, 
PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
+        runner.setProperty(PutSolrContentStream.SOLR_LOCATION, 
"http://localhost:8443/solr";);
+        runner.assertValid();
+    }
+
+    @Test
+    public void testRequestParamsShouldBeInvalid() {
+        final TestRunner runner = 
TestRunners.newTestRunner(PutSolrContentStream.class);
+        runner.setProperty(PutSolrContentStream.SOLR_TYPE, 
PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
+        runner.setProperty(PutSolrContentStream.SOLR_LOCATION, 
"http://localhost:8443/solr";);
+        runner.setProperty(PutSolrContentStream.REQUEST_PARAMS, "a=1&b");
+        runner.assertNotValid();
+    }
+
+    /**
+     * Override the creatrSolrServer method to inject a Mock.
+     */
+    private class ExceptionThrowingProcessor extends PutSolrContentStream {
+
+        private SolrClient mockSolrServer;
+        private Throwable throwable;
+
+        public ExceptionThrowingProcessor(Throwable throwable) {
+            this.throwable = throwable;
+        }
+
+        @Override
+        protected SolrClient createSolrServer(ProcessContext context) {
+            mockSolrServer = Mockito.mock(SolrClient.class);
+            try {
+                
when(mockSolrServer.request(any(SolrRequest.class))).thenThrow(throwable);
+            } catch (SolrServerException e) {
+                Assert.fail(e.getMessage());
+            } catch (IOException e) {
+                Assert.fail(e.getMessage());
+            }
+            return mockSolrServer;
+        }
+
+    }
+
+    /**
+     * Override the createSolrServer method and create and EmbeddedSolrServer.
+     */
+    private class EmbeddedSolrServerProcessor extends PutSolrContentStream {
+
+        private String coreName;
+        private SolrClient embeddedSolrServer;
+
+        public EmbeddedSolrServerProcessor(String coreName) {
+            this.coreName = coreName;
+        }
+
+        @Override
+        protected SolrClient createSolrServer(ProcessContext context) {
+            try {
+                String relPath = getClass().getProtectionDomain()
+                        .getCodeSource().getLocation().getFile()
+                        + "../../target";
+
+                embeddedSolrServer = EmbeddedSolrServerFactory.create(
+                        EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME,
+                        EmbeddedSolrServerFactory.DEFAULT_CORE_HOME,
+                        coreName, relPath);
+            } catch (IOException e) {
+                Assert.fail(e.getMessage());
+            }
+            return embeddedSolrServer;
+        }
+
+    }
+
+    /**
+     * Verify that given SolrServer contains the expected SolrDocuments.
+     */
+    private static void verifySolrDocuments(SolrClient solrServer, 
Collection<SolrDocument> expectedDocuments)
+            throws IOException, SolrServerException {
+
+        solrServer.commit();
+
+        SolrQuery query = new SolrQuery("*:*");
+        QueryResponse qResponse = solrServer.query(query);
+        Assert.assertEquals(expectedDocuments.size(), 
qResponse.getResults().getNumFound());
+
+        // verify documents have expected fields and values
+        for (SolrDocument expectedDoc : expectedDocuments) {
+            boolean found = false;
+            for (SolrDocument solrDocument : qResponse.getResults()) {
+                boolean foundAllFields = true;
+                for (String expectedField : expectedDoc.getFieldNames()) {
+                    Object expectedVal = 
expectedDoc.getFirstValue(expectedField);
+                    Object actualVal = 
solrDocument.getFirstValue(expectedField);
+                    foundAllFields = expectedVal.equals(actualVal);
+                }
+
+                if (foundAllFields) {
+                    found = true;
+                    break;
+                }
+            }
+            Assert.assertTrue("Could not find " + expectedDoc, found);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/71b6ffc9/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/log4j.properties
 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/log4j.properties
new file mode 100644
index 0000000..4a3bdd3
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/log4j.properties
@@ -0,0 +1,14 @@
+#  Logging level
+solr.log=logs/
+log4j.rootLogger=INFO, CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x \u2013 
%m%n
+
+log4j.logger.org.apache.zookeeper=WARN
+log4j.logger.org.apache.hadoop=WARN
+
+# set to INFO to enable infostream log messages
+log4j.logger.org.apache.solr.update.LoggingInfoStream=OFF
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/71b6ffc9/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/solr.xml
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/solr.xml
 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/solr.xml
new file mode 100644
index 0000000..86fb3db
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/solr.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<solr>
+
+  <solrcloud>
+    <str name="host">${host:}</str>
+    <int name="hostPort">${jetty.port:8983}</int>
+    <str name="hostContext">${hostContext:solr}</str>
+    <int name="zkClientTimeout">${zkClientTimeout:30000}</int>
+    <bool name="genericCoreNodeNames">${genericCoreNodeNames:true}</bool>
+  </solrcloud>
+
+  <shardHandlerFactory name="shardHandlerFactory"
+    class="HttpShardHandlerFactory">
+    <int name="socketTimeout">${socketTimeout:0}</int>
+    <int name="connTimeout">${connTimeout:0}</int>
+  </shardHandlerFactory>
+
+</solr>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/71b6ffc9/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/_rest_managed.json
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/_rest_managed.json
 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/_rest_managed.json
new file mode 100644
index 0000000..e7ada3f
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/_rest_managed.json
@@ -0,0 +1,3 @@
+{
+  "initArgs":{},
+  "managedList":[]}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/71b6ffc9/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/lang/stopwords_en.txt
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/lang/stopwords_en.txt
 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/lang/stopwords_en.txt
new file mode 100644
index 0000000..2c164c0
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/lang/stopwords_en.txt
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# a couple of test stopwords to test that the words are really being
+# configured from this file:
+stopworda
+stopwordb
+
+# Standard english stop words taken from Lucene's StopAnalyzer
+a
+an
+and
+are
+as
+at
+be
+but
+by
+for
+if
+in
+into
+is
+it
+no
+not
+of
+on
+or
+such
+that
+the
+their
+then
+there
+these
+they
+this
+to
+was
+will
+with

Reply via email to