Addressing feedback from pull request, adding commitWithin parameter to 
PutSolrContentStream since all update handlers support it


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/23989609
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/23989609
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/23989609

Branch: refs/heads/develop
Commit: 23989609ae418bba7f18c573a8078ddecf0e8035
Parents: 041f543
Author: bbende <[email protected]>
Authored: Thu Mar 26 22:27:09 2015 -0400
Committer: bbende <[email protected]>
Committed: Thu Mar 26 22:27:09 2015 -0400

----------------------------------------------------------------------
 nifi/nifi-assembly/pom.xml                      |   2 +-
 .../apache/nifi/processors/solr/GetSolr.java    |  16 +--
 .../processors/solr/PutSolrContentStream.java   | 100 +++++++++++++------
 .../nifi/processors/solr/SolrProcessor.java     |  39 +++-----
 .../nifi/processors/solr/TestGetSolr.java       |   4 +-
 .../solr/TestPutSolrContentStream.java          |  46 ++++-----
 6 files changed, 115 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/23989609/nifi/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-assembly/pom.xml b/nifi/nifi-assembly/pom.xml
index 6682551..fd23f28 100644
--- a/nifi/nifi-assembly/pom.xml
+++ b/nifi/nifi-assembly/pom.xml
@@ -219,7 +219,7 @@
         
<nifi.content.repository.archive.enabled>false</nifi.content.repository.archive.enabled>
         
<nifi.content.repository.always.sync>false</nifi.content.repository.always.sync>
         <nifi.content.viewer.url />
-        
+
         <nifi.restore.directory />
         <nifi.ui.banner.text />
         <nifi.ui.autorefresh.interval>30 sec</nifi.ui.autorefresh.interval>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/23989609/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
index d230cc1..0af6043 100644
--- 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
+++ 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
@@ -117,7 +117,7 @@ public class GetSolr extends SolrProcessor {
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
         descriptors.add(SOLR_TYPE);
         descriptors.add(SOLR_LOCATION);
-        descriptors.add(DEFAULT_COLLECTION);
+        descriptors.add(COLLECTION);
         descriptors.add(SOLR_QUERY);
         descriptors.add(RETURN_FIELDS);
         descriptors.add(SORT_CLAUSE);
@@ -153,14 +153,6 @@ public class GetSolr extends SolrProcessor {
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
         final ProcessorLog logger = getLogger();
-
-        final FlowFile incomingFlowFile = session.get();
-        if (incomingFlowFile != null) {
-            session.transfer(incomingFlowFile, REL_SUCCESS);
-            logger.warn("found FlowFile {} in input queue; transferring to 
success",
-                    new Object[]{incomingFlowFile});
-        }
-
         readLastEndDate();
 
         final SimpleDateFormat sdf = new 
SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US);
@@ -201,7 +193,7 @@ public class GetSolr extends SolrProcessor {
         try {
             // run the initial query and send out the first page of results
             final StopWatch stopWatch = new StopWatch(true);
-            QueryResponse response = getSolrServer().query(solrQuery);
+            QueryResponse response = getSolrClient().query(solrQuery);
             stopWatch.stop();
 
             long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS);
@@ -218,7 +210,7 @@ public class GetSolr extends SolrProcessor {
                 StringBuilder transitUri = new StringBuilder("solr://");
                 
transitUri.append(context.getProperty(SOLR_LOCATION).getValue());
                 if 
(SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) {
-                    
transitUri.append("/").append(context.getProperty(DEFAULT_COLLECTION).getValue());
+                    
transitUri.append("/").append(context.getProperty(COLLECTION).getValue());
                 }
 
                 session.getProvenanceReporter().receive(flowFile, 
transitUri.toString(), duration);
@@ -232,7 +224,7 @@ public class GetSolr extends SolrProcessor {
                         solrQuery.setStart(endRow);
 
                         stopWatch.start();
-                        response = getSolrServer().query(solrQuery);
+                        response = getSolrClient().query(solrQuery);
                         stopWatch.stop();
 
                         duration = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/23989609/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
index 704d8a2..7cef7a5 100644
--- 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
+++ 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
@@ -18,9 +18,10 @@
  */
 package org.apache.nifi.processors.solr;
 
-import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
@@ -48,13 +49,16 @@ import java.util.concurrent.TimeUnit;
 
 @Tags({"Apache", "Solr", "Put", "Send"})
 @CapabilityDescription("Sends the contents of a FlowFile as a ContentStream to 
Solr")
+@DynamicProperty(name="A Solr request parameter name", value="A Solr request 
parameter value",
+        description="These parameters will be passed to Solr on the request")
 public class PutSolrContentStream extends SolrProcessor {
 
-    public static final PropertyDescriptor CONTENT_STREAM_URL = new 
PropertyDescriptor
-            .Builder().name("Content Stream URL")
-            .description("The URL in Solr to post the ContentStream")
+    public static final PropertyDescriptor CONTENT_STREAM_PATH = new 
PropertyDescriptor
+            .Builder().name("Content Stream Path")
+            .description("The path in Solr to post the ContentStream")
             .required(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
             .defaultValue("/update/json/docs")
             .build();
 
@@ -63,9 +67,18 @@ public class PutSolrContentStream extends SolrProcessor {
             .description("Content-Type being sent to Solr")
             .required(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
             .defaultValue("application/json")
             .build();
 
+    public static final PropertyDescriptor COMMIT_WITHIN = new 
PropertyDescriptor
+            .Builder().name("Commit Within")
+            .description("The number of milliseconds before the given update 
is committed")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
     public static final PropertyDescriptor REQUEST_PARAMS = new 
PropertyDescriptor
             .Builder().name("Request Parameters")
             .description("Additional parameters to pass to Solr on each 
request, i.e. key1=val1&key2=val2")
@@ -74,8 +87,8 @@ public class PutSolrContentStream extends SolrProcessor {
             .defaultValue("json.command=false&split=/&f=id:/field1")
             .build();
 
-    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
-            .name("original")
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
             .description("The original FlowFile")
             .build();
 
@@ -89,10 +102,8 @@ public class PutSolrContentStream extends SolrProcessor {
             .description("FlowFiles that failed because Solr is unreachable")
             .build();
 
-    /**
-     * The name of a FlowFile attribute used for specifying a Solr collection.
-     */
-    public static final String SOLR_COLLECTION_ATTR = "solr.collection";
+    public static final String COLLECTION_PARAM_NAME = "collection";
+    public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
 
     private Set<Relationship> relationships;
     private List<PropertyDescriptor> descriptors;
@@ -105,14 +116,15 @@ public class PutSolrContentStream extends SolrProcessor {
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
         descriptors.add(SOLR_TYPE);
         descriptors.add(SOLR_LOCATION);
-        descriptors.add(DEFAULT_COLLECTION);
-        descriptors.add(CONTENT_STREAM_URL);
+        descriptors.add(COLLECTION);
+        descriptors.add(CONTENT_STREAM_PATH);
         descriptors.add(CONTENT_TYPE);
+        descriptors.add(COMMIT_WITHIN);
         descriptors.add(REQUEST_PARAMS);
         this.descriptors = Collections.unmodifiableList(descriptors);
 
         final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_ORIGINAL);
+        relationships.add(REL_SUCCESS);
         relationships.add(REL_FAILURE);
         relationships.add(REL_CONNECTION_FAILURE);
         this.relationships = Collections.unmodifiableSet(relationships);
@@ -129,7 +141,18 @@ public class PutSolrContentStream extends SolrProcessor {
     }
 
     @Override
-    protected void additionalOnScheduled(ProcessContext context) {
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .description("Specifies the value to send for the '" + 
propertyDescriptorName + "' request parameter")
+                .name(propertyDescriptorName)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                .dynamic(true)
+                .expressionLanguageSupported(true)
+                .build();
+    }
+
+    @OnScheduled
+    public void initializeRequestParams(ProcessContext context) {
         final String requestParamsVal = 
context.getProperty(REQUEST_PARAMS).getValue();
         this.requestParams = RequestParamsUtil.parse(requestParamsVal);
     }
@@ -143,17 +166,19 @@ public class PutSolrContentStream extends SolrProcessor {
 
         final ObjectHolder<SolrException> error = new ObjectHolder<>(null);
         final ObjectHolder<SolrServerException> connectionError = new 
ObjectHolder<>(null);
-        final ObjectHolder<String> collectionUsed = new ObjectHolder<>(null);
 
-        final String collectionAttrVal = 
flowFile.getAttribute(SOLR_COLLECTION_ATTR);
         final boolean isSolrCloud = 
SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue());
+        final String collection = 
context.getProperty(COLLECTION_PARAM_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final Long commitWithin = 
context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
 
         StopWatch timer = new StopWatch(true);
         session.read(flowFile, new InputStreamCallback() {
             @Override
             public void process(final InputStream in) throws IOException {
-                ContentStreamUpdateRequest request = new 
ContentStreamUpdateRequest(
-                        context.getProperty(CONTENT_STREAM_URL).getValue());
+                final String contentStreamPath = 
context.getProperty(CONTENT_STREAM_PATH)
+                        .evaluateAttributeExpressions().getValue();
+
+                ContentStreamUpdateRequest request = new 
ContentStreamUpdateRequest(contentStreamPath);
                 request.setParams(new ModifiableSolrParams());
 
                 // add the extra params, don't use 'set' in case of repeating 
params
@@ -165,14 +190,13 @@ public class PutSolrContentStream extends SolrProcessor {
                     }
                 }
 
-                // send the request to the specified collection, or to the 
default collection
+                // specify the collection for SolrCloud
                 if (isSolrCloud) {
-                    String collection = collectionAttrVal;
-                    if (StringUtils.isBlank(collection)) {
-                        collection = 
context.getProperty(DEFAULT_COLLECTION).getValue();
-                    }
-                    request.setParam("collection", collection);
-                    collectionUsed.set(collection);
+                    request.setParam(COLLECTION_PARAM_NAME, collection);
+                }
+
+                if (commitWithin != null && commitWithin > 0) {
+                    request.setParam(COMMIT_WITHIN_PARAM_NAME, 
commitWithin.toString());
                 }
 
                 try (final BufferedInputStream bufferedIn = new 
BufferedInputStream(in)) {
@@ -185,11 +209,11 @@ public class PutSolrContentStream extends SolrProcessor {
 
                         @Override
                         public String getContentType() {
-                            return 
context.getProperty(CONTENT_TYPE).getValue();
+                            return 
context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions().getValue();
                         }
                     });
 
-                    UpdateResponse response = request.process(getSolrServer());
+                    UpdateResponse response = request.process(getSolrClient());
                     getLogger().debug("Got {} response from Solr", new 
Object[]{response.getStatus()});
                 } catch (SolrException e) {
                     error.set(e);
@@ -213,14 +237,32 @@ public class PutSolrContentStream extends SolrProcessor {
             StringBuilder transitUri = new StringBuilder("solr://");
             transitUri.append(context.getProperty(SOLR_LOCATION).getValue());
             if (isSolrCloud) {
-                transitUri.append(":").append(collectionUsed.get());
+                transitUri.append(":").append(collection);
             }
 
             final long duration = timer.getDuration(TimeUnit.MILLISECONDS);
             session.getProvenanceReporter().send(flowFile, 
transitUri.toString(), duration, true);
             getLogger().info("Successfully sent {} to Solr in {} millis", new 
Object[]{flowFile, duration});
-            session.transfer(flowFile, REL_ORIGINAL);
+            session.transfer(flowFile, REL_SUCCESS);
+        }
+    }
+
+    // get all of the dynamic properties and values into a Map for later 
adding to the Solr request
+    private Map<String, String[]> getRequestParams(ProcessContext context, 
FlowFile flowFile) {
+        final Map<String,String[]> paramsMap = new HashMap<>();
+
+        for (final Map.Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
+            final PropertyDescriptor descriptor = entry.getKey();
+            if (descriptor.isDynamic()) {
+                final String paramName = descriptor.getName();
+                final String paramValue = 
context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue();
+
+                if (!paramValue.trim().isEmpty()) {
+                    MultiMapSolrParams.addParam(paramName, paramValue, 
paramsMap);
+                }
+            }
         }
+        return paramsMap;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/23989609/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
index f286a1a..70b71a4 100644
--- 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
+++ 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
@@ -51,7 +51,6 @@ public abstract class SolrProcessor extends AbstractProcessor 
{
             .Builder().name("Solr Type")
             .description("The type of Solr instance, Cloud or Standard.")
             .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .allowableValues(SOLR_TYPE_CLOUD, SOLR_TYPE_STANDARD)
             .defaultValue(SOLR_TYPE_STANDARD.getValue())
             .build();
@@ -64,58 +63,48 @@ public abstract class SolrProcessor extends 
AbstractProcessor {
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
-    public static final PropertyDescriptor DEFAULT_COLLECTION = new 
PropertyDescriptor
-            .Builder().name("Default Collection")
+    public static final PropertyDescriptor COLLECTION = new PropertyDescriptor
+            .Builder().name("Collection")
             .description("The Solr collection name, only used with a Solr Type 
of Cloud")
             .required(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
 
-    private volatile SolrClient solrServer;
+    private volatile SolrClient solrClient;
 
     @OnScheduled
     public final void onScheduled(final ProcessContext context) throws 
IOException {
-        this.solrServer = createSolrServer(context);
-        additionalOnScheduled(context);
+        this.solrClient = createSolrClient(context);
     }
 
     /**
-     * Create a SolrServer based on the type of Solr specified.
+     * Create a SolrClient based on the type of Solr specified.
      *
      * @param context
      *          The context
-     * @return an HttpSolrServer or CloudSolrServer
+     * @return an HttpSolrClient or CloudSolrClient
      */
-    protected SolrClient createSolrServer(final ProcessContext context) {
+    protected SolrClient createSolrClient(final ProcessContext context) {
         if 
(SOLR_TYPE_STANDARD.equals(context.getProperty(SOLR_TYPE).getValue())) {
             return new 
HttpSolrClient(context.getProperty(SOLR_LOCATION).getValue());
         } else {
             CloudSolrClient cloudSolrServer = new CloudSolrClient(
                     context.getProperty(SOLR_LOCATION).getValue());
             cloudSolrServer.setDefaultCollection(
-                    context.getProperty(DEFAULT_COLLECTION).getValue());
+                    
context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue());
             return cloudSolrServer;
         }
     }
 
     /**
      * Returns the {@link org.apache.solr.client.solrj.SolrClient} that was 
created by the
-     * {@link #createSolrServer(org.apache.nifi.processor.ProcessContext)} 
method
+     * {@link #createSolrClient(org.apache.nifi.processor.ProcessContext)} 
method
      *
      * @return
      */
-    protected final SolrClient getSolrServer() {
-        return solrServer;
-    }
-
-    /**
-     * Allows additional action to be taken during scheduling of processor.
-     *
-     * @param context
-     *          The context
-     */
-    protected void additionalOnScheduled(final ProcessContext context) {
-
+    protected final SolrClient getSolrClient() {
+        return solrClient;
     }
 
     @Override
@@ -123,10 +112,10 @@ public abstract class SolrProcessor extends 
AbstractProcessor {
         final List<ValidationResult> problems = new ArrayList<>();
 
         if (SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) 
{
-            final String collection = 
context.getProperty(DEFAULT_COLLECTION).getValue();
+            final String collection = 
context.getProperty(COLLECTION).getValue();
             if (collection == null || collection.trim().isEmpty()) {
                 problems.add(new ValidationResult.Builder()
-                        .subject(DEFAULT_COLLECTION.getName())
+                        .subject(COLLECTION.getName())
                         .input(collection).valid(false)
                         .explanation("A collection must specified for Solr 
Type of Cloud")
                         .build());

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/23989609/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java
 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java
index 52eb06b..dcae008 100644
--- 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java
+++ 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java
@@ -183,7 +183,7 @@ public class TestGetSolr {
     }
 
 
-    // Override createSolrServer and return the passed in SolrClient
+    // Override createSolrClient and return the passed in SolrClient
     private class TestableProcessor extends GetSolr {
         private SolrClient solrClient;
 
@@ -191,7 +191,7 @@ public class TestGetSolr {
             this.solrClient = solrClient;
         }
         @Override
-        protected SolrClient createSolrServer(ProcessContext context) {
+        protected SolrClient createSolrClient(ProcessContext context) {
             return solrClient;
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/23989609/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
index 8800978..98761c4 100644
--- 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
+++ 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
@@ -82,7 +82,7 @@ public class TestPutSolrContentStream {
         final EmbeddedSolrServerProcessor proc = new 
EmbeddedSolrServerProcessor(DEFAULT_SOLR_CORE);
 
         final TestRunner runner = createDefaultTestRunner(proc);
-        runner.setProperty(PutSolrContentStream.CONTENT_STREAM_URL, 
"/update/json/docs");
+        runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, 
"/update/json/docs");
         
runner.setProperty(PutSolrContentStream.REQUEST_PARAMS,"json.command=false");
 
         try (FileInputStream fileIn = new 
FileInputStream(SOLR_JSON_MULTIPLE_DOCS_FILE)) {
@@ -91,11 +91,11 @@ public class TestPutSolrContentStream {
             runner.run();
             runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
             
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
-            runner.assertTransferCount(PutSolrContentStream.REL_ORIGINAL, 1);
+            runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1);
 
-            verifySolrDocuments(proc.getSolrServer(), 
Arrays.asList(expectedDoc1, expectedDoc2));
+            verifySolrDocuments(proc.getSolrClient(), 
Arrays.asList(expectedDoc1, expectedDoc2));
         } finally {
-            try { proc.getSolrServer().shutdown(); } catch (Exception e) { }
+            try { proc.getSolrClient().shutdown(); } catch (Exception e) { }
         }
     }
 
@@ -104,7 +104,7 @@ public class TestPutSolrContentStream {
         final EmbeddedSolrServerProcessor proc = new 
EmbeddedSolrServerProcessor(DEFAULT_SOLR_CORE);
 
         final TestRunner runner = createDefaultTestRunner(proc);
-        runner.setProperty(PutSolrContentStream.CONTENT_STREAM_URL, 
"/update/json/docs");
+        runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, 
"/update/json/docs");
         runner.setProperty(PutSolrContentStream.REQUEST_PARAMS,
                 "split=/exams" +
                 "&f=first:/first" +
@@ -120,11 +120,11 @@ public class TestPutSolrContentStream {
             runner.run();
             runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
             
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
-            runner.assertTransferCount(PutSolrContentStream.REL_ORIGINAL, 1);
+            runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1);
 
-            verifySolrDocuments(proc.getSolrServer(), 
Arrays.asList(expectedDoc1, expectedDoc2));
+            verifySolrDocuments(proc.getSolrClient(), 
Arrays.asList(expectedDoc1, expectedDoc2));
         } finally {
-            try { proc.getSolrServer().shutdown(); } catch (Exception e) { }
+            try { proc.getSolrClient().shutdown(); } catch (Exception e) { }
         }
     }
 
@@ -133,7 +133,7 @@ public class TestPutSolrContentStream {
         final EmbeddedSolrServerProcessor proc = new 
EmbeddedSolrServerProcessor(DEFAULT_SOLR_CORE);
 
         final TestRunner runner = createDefaultTestRunner(proc);
-        runner.setProperty(PutSolrContentStream.CONTENT_STREAM_URL, 
"/update/csv");
+        runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, 
"/update/csv");
         runner.setProperty(PutSolrContentStream.REQUEST_PARAMS,
                 "fieldnames=first,last,grade,subject,test,marks");
 
@@ -143,11 +143,11 @@ public class TestPutSolrContentStream {
             runner.run();
             runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
             
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
-            runner.assertTransferCount(PutSolrContentStream.REL_ORIGINAL, 1);
+            runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1);
 
-            verifySolrDocuments(proc.getSolrServer(), 
Arrays.asList(expectedDoc1, expectedDoc2));
+            verifySolrDocuments(proc.getSolrClient(), 
Arrays.asList(expectedDoc1, expectedDoc2));
         } finally {
-            try { proc.getSolrServer().shutdown(); } catch (Exception e) { }
+            try { proc.getSolrClient().shutdown(); } catch (Exception e) { }
         }
     }
 
@@ -156,7 +156,7 @@ public class TestPutSolrContentStream {
         final EmbeddedSolrServerProcessor proc = new 
EmbeddedSolrServerProcessor(DEFAULT_SOLR_CORE);
 
         final TestRunner runner = createDefaultTestRunner(proc);
-        runner.setProperty(PutSolrContentStream.CONTENT_STREAM_URL, "/update");
+        runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, 
"/update");
         runner.setProperty(PutSolrContentStream.CONTENT_TYPE, 
"application/xml");
 
         try (FileInputStream fileIn = new 
FileInputStream(XML_MULTIPLE_DOCS_FILE)) {
@@ -165,11 +165,11 @@ public class TestPutSolrContentStream {
             runner.run();
             runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
             
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
-            runner.assertTransferCount(PutSolrContentStream.REL_ORIGINAL, 1);
+            runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1);
 
-            verifySolrDocuments(proc.getSolrServer(), 
Arrays.asList(expectedDoc1, expectedDoc2));
+            verifySolrDocuments(proc.getSolrClient(), 
Arrays.asList(expectedDoc1, expectedDoc2));
         } finally {
-            try { proc.getSolrServer().shutdown(); } catch (Exception e) { }
+            try { proc.getSolrClient().shutdown(); } catch (Exception e) { }
         }
     }
 
@@ -185,7 +185,7 @@ public class TestPutSolrContentStream {
             runner.run();
 
             
runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_CONNECTION_FAILURE,
 1);
-            verify(proc.getSolrServer(), 
times(1)).request(any(SolrRequest.class));
+            verify(proc.getSolrClient(), 
times(1)).request(any(SolrRequest.class));
         }
     }
 
@@ -201,7 +201,7 @@ public class TestPutSolrContentStream {
             runner.run();
 
             
runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_FAILURE, 1);
-            verify(proc.getSolrServer(), 
times(1)).request(any(SolrRequest.class));
+            verify(proc.getSolrClient(), 
times(1)).request(any(SolrRequest.class));
         }
     }
 
@@ -218,7 +218,7 @@ public class TestPutSolrContentStream {
             runner.run();
 
             
runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_FAILURE, 1);
-            verify(proc.getSolrServer(), 
times(1)).request(any(SolrRequest.class));
+            verify(proc.getSolrClient(), 
times(1)).request(any(SolrRequest.class));
         }
     }
 
@@ -230,7 +230,7 @@ public class TestPutSolrContentStream {
         runner.setProperty(PutSolrContentStream.SOLR_LOCATION, 
"http://localhost:8443/solr";);
         runner.assertNotValid();
 
-        runner.setProperty(PutSolrContentStream.DEFAULT_COLLECTION, 
"someCollection1");
+        runner.setProperty(PutSolrContentStream.COLLECTION, "someCollection1");
         runner.assertValid();
     }
 
@@ -264,7 +264,7 @@ public class TestPutSolrContentStream {
         }
 
         @Override
-        protected SolrClient createSolrServer(ProcessContext context) {
+        protected SolrClient createSolrClient(ProcessContext context) {
             mockSolrServer = Mockito.mock(SolrClient.class);
             try {
                 
when(mockSolrServer.request(any(SolrRequest.class))).thenThrow(throwable);
@@ -279,7 +279,7 @@ public class TestPutSolrContentStream {
     }
 
     /**
-     * Override the createSolrServer method and create and EmbeddedSolrServer.
+     * Override the createSolrClient method and create and EmbeddedSolrServer.
      */
     private class EmbeddedSolrServerProcessor extends PutSolrContentStream {
 
@@ -291,7 +291,7 @@ public class TestPutSolrContentStream {
         }
 
         @Override
-        protected SolrClient createSolrServer(ProcessContext context) {
+        protected SolrClient createSolrClient(ProcessContext context) {
             try {
                 String relPath = getClass().getProtectionDomain()
                         .getCodeSource().getLocation().getFile()

Reply via email to