Adding support for dynamic properties to PutSolrContentStream

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/5b3709fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/5b3709fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/5b3709fe

Branch: refs/heads/develop
Commit: 5b3709fe0c9a0ba416e97832bf0a9de2806358d4
Parents: bd6159e
Author: bbende <[email protected]>
Authored: Sun Apr 5 22:22:49 2015 -0400
Committer: bbende <[email protected]>
Committed: Sun Apr 5 22:22:49 2015 -0400

----------------------------------------------------------------------
 .../processors/solr/PutSolrContentStream.java   | 34 ++++----
 .../nifi/processors/solr/RequestParamsUtil.java | 85 --------------------
 .../additionalDetails.html                      | 48 +++++++++++
 .../processors/solr/RequestParamsUtilTest.java  | 49 -----------
 .../solr/TestPutSolrContentStream.java          | 30 +++----
 5 files changed, 74 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/5b3709fe/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
index 7cef7a5..89b510c 100644
--- 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
+++ 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
@@ -79,14 +79,6 @@ public class PutSolrContentStream extends SolrProcessor {
             .expressionLanguageSupported(true)
             .build();
 
-    public static final PropertyDescriptor REQUEST_PARAMS = new 
PropertyDescriptor
-            .Builder().name("Request Parameters")
-            .description("Additional parameters to pass to Solr on each 
request, i.e. key1=val1&key2=val2")
-            .required(false)
-            .addValidator(RequestParamsUtil.getValidator())
-            .defaultValue("json.command=false&split=/&f=id:/field1")
-            .build();
-
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("The original FlowFile")
@@ -104,10 +96,10 @@ public class PutSolrContentStream extends SolrProcessor {
 
     public static final String COLLECTION_PARAM_NAME = "collection";
     public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
+    public static final String REPEATING_PARAM_PATTERN = "\\w+\\.\\d+";
 
     private Set<Relationship> relationships;
     private List<PropertyDescriptor> descriptors;
-    private volatile MultiMapSolrParams requestParams;
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
@@ -120,7 +112,6 @@ public class PutSolrContentStream extends SolrProcessor {
         descriptors.add(CONTENT_STREAM_PATH);
         descriptors.add(CONTENT_TYPE);
         descriptors.add(COMMIT_WITHIN);
-        descriptors.add(REQUEST_PARAMS);
         this.descriptors = Collections.unmodifiableList(descriptors);
 
         final Set<Relationship> relationships = new HashSet<>();
@@ -151,12 +142,6 @@ public class PutSolrContentStream extends SolrProcessor {
                 .build();
     }
 
-    @OnScheduled
-    public void initializeRequestParams(ProcessContext context) {
-        final String requestParamsVal = 
context.getProperty(REQUEST_PARAMS).getValue();
-        this.requestParams = RequestParamsUtil.parse(requestParamsVal);
-    }
-
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         FlowFile flowFile = session.get();
@@ -171,6 +156,8 @@ public class PutSolrContentStream extends SolrProcessor {
         final String collection = 
context.getProperty(COLLECTION_PARAM_NAME).evaluateAttributeExpressions(flowFile).getValue();
         final Long commitWithin = 
context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
 
+        final MultiMapSolrParams requestParams = new 
MultiMapSolrParams(getRequestParams(context, flowFile));
+
         StopWatch timer = new StopWatch(true);
         session.read(flowFile, new InputStreamCallback() {
             @Override
@@ -250,6 +237,7 @@ public class PutSolrContentStream extends SolrProcessor {
     // get all of the dynamic properties and values into a Map for later 
adding to the Solr request
     private Map<String, String[]> getRequestParams(ProcessContext context, 
FlowFile flowFile) {
         final Map<String,String[]> paramsMap = new HashMap<>();
+        final SortedMap<String,String> repeatingParams = new TreeMap<>();
 
         for (final Map.Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
             final PropertyDescriptor descriptor = entry.getKey();
@@ -258,10 +246,22 @@ public class PutSolrContentStream extends SolrProcessor {
                 final String paramValue = 
context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue();
 
                 if (!paramValue.trim().isEmpty()) {
-                    MultiMapSolrParams.addParam(paramName, paramValue, 
paramsMap);
+                    if (paramName.matches(REPEATING_PARAM_PATTERN)) {
+                        repeatingParams.put(paramName, paramValue);
+                    } else {
+                        MultiMapSolrParams.addParam(paramName, paramValue, 
paramsMap);
+                    }
                 }
             }
         }
+
+        for (final Map.Entry<String,String> entry : 
repeatingParams.entrySet()) {
+            final String paramName = entry.getKey();
+            final String paramValue = entry.getValue();
+            final int idx = paramName.lastIndexOf(".");
+            MultiMapSolrParams.addParam(paramName.substring(0, idx), 
paramValue, paramsMap);
+        }
+
         return paramsMap;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/5b3709fe/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/RequestParamsUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/RequestParamsUtil.java
 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/RequestParamsUtil.java
deleted file mode 100644
index 647f04e..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/RequestParamsUtil.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.solr;
-
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-import org.apache.solr.common.params.MultiMapSolrParams;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class RequestParamsUtil {
-
-    /**
-     * Parses a String of request params into a MultiMap.
-     *
-     * @param requestParams
-     *          the value of the request params property
-     * @return
-     */
-    public static MultiMapSolrParams parse(final String requestParams) {
-        final Map<String,String[]> paramsMap = new HashMap<>();
-        if (requestParams == null || requestParams.trim().isEmpty()) {
-            return new MultiMapSolrParams(paramsMap);
-        }
-
-        final String[] params = requestParams.split("[&]");
-        if (params == null || params.length == 0) {
-            throw new IllegalStateException(
-                    "Parameters must be in form k1=v1&k2=v2, was" + 
requestParams);
-        }
-
-        for (final String param : params) {
-            final String[] keyVal = param.split("=");
-            if (keyVal.length != 2) {
-                throw new IllegalStateException(
-                        "Parameter must be in form key=value, was " + param);
-            }
-
-            final String key = keyVal[0].trim();
-            final String val = keyVal[1].trim();
-            MultiMapSolrParams.addParam(key, val, paramsMap);
-        }
-
-        return new MultiMapSolrParams(paramsMap);
-    }
-
-    /**
-     * Creates a property validator for a request params string.
-     *
-     * @return valid if the input parses successfully, invalid otherwise
-     */
-    public static Validator getValidator() {
-        return new Validator() {
-            @Override
-            public ValidationResult validate(String subject, String input, 
ValidationContext context) {
-                try {
-                    RequestParamsUtil.parse(input);
-                    return new 
ValidationResult.Builder().subject(subject).input(input)
-                            .explanation("Valid Params").valid(true).build();
-                } catch (final Exception e) {
-                    return new 
ValidationResult.Builder().subject(subject).input(input)
-                            .explanation("Invalid Params" + 
e.getMessage()).valid(false).build();
-                }
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/5b3709fe/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.PutSolrContentStream/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.PutSolrContentStream/additionalDetails.html
 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.PutSolrContentStream/additionalDetails.html
new file mode 100644
index 0000000..054cdb6
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.PutSolrContentStream/additionalDetails.html
@@ -0,0 +1,48 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PutSolrContentStream</title>
+    <link rel="stylesheet" href="../../css/component-usage.css" 
type="text/css" />
+</head>
+
+<body>
+<h2>Usage Example</h2>
+<p>
+    This processor streams the contents of a FlowFile to an Apache Solr
+    update handler. Any properties added to this processor by the user are
+    passed to Solr on the update request. If a parameter must be sent multiple
+    times with different values, properties can follow a naming convention:
+    name.number, where name is the parameter name and number is a unique 
number.
+    Repeating parameters will be sorted by their property name.
+</p>
+<p>
+    Example: To specify multiple 'f' parameters for indexing custom json, the 
following
+    properties can be defined:
+</p>
+<ul>
+    <li><strong>split</strong>: /exams</li>
+    <li><strong>f.1</strong>: first:/first</li>
+    <li><strong>f.2</strong>: last:/last</li>
+    <li><strong>f.3</strong>: grade:/grade</li>
+</ul>
+<p>
+    This will result in sending the following url to Solr: </br>
+    split=/exams&f=first:/first&f=last:/last&f=grade:/grade
+</p>
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/5b3709fe/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/RequestParamsUtilTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/RequestParamsUtilTest.java
 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/RequestParamsUtilTest.java
deleted file mode 100644
index 5a1373e..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/RequestParamsUtilTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.solr;
-
-import org.junit.Assert;
-import org.apache.solr.common.params.MultiMapSolrParams;
-import org.junit.Test;
-
-
-public class RequestParamsUtilTest {
-
-    @Test
-    public void testSimpleParse() {
-        MultiMapSolrParams map = RequestParamsUtil.parse("a=1&b=2&c=3");
-        Assert.assertEquals("1", map.get("a"));
-        Assert.assertEquals("2", map.get("b"));
-        Assert.assertEquals("3", map.get("c"));
-    }
-
-    @Test
-    public void testParseWithSpaces() {
-        MultiMapSolrParams map = RequestParamsUtil.parse("a = 1 &b= 2& c= 3 ");
-        Assert.assertEquals("1", map.get("a"));
-        Assert.assertEquals("2", map.get("b"));
-        Assert.assertEquals("3", map.get("c"));
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testMalformedParamsParse() {
-        RequestParamsUtil.parse("a=1&b&c=3");
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/5b3709fe/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
index 98761c4..141fbb6 100644
--- 
a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
+++ 
b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
@@ -83,7 +83,7 @@ public class TestPutSolrContentStream {
 
         final TestRunner runner = createDefaultTestRunner(proc);
         runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, 
"/update/json/docs");
-        
runner.setProperty(PutSolrContentStream.REQUEST_PARAMS,"json.command=false");
+        runner.setProperty("json.command", "false");
 
         try (FileInputStream fileIn = new 
FileInputStream(SOLR_JSON_MULTIPLE_DOCS_FILE)) {
             runner.enqueue(fileIn);
@@ -105,14 +105,13 @@ public class TestPutSolrContentStream {
 
         final TestRunner runner = createDefaultTestRunner(proc);
         runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, 
"/update/json/docs");
-        runner.setProperty(PutSolrContentStream.REQUEST_PARAMS,
-                "split=/exams" +
-                "&f=first:/first" +
-                "&f=last:/last" +
-                "&f=grade:/grade" +
-                "&f=subject:/exams/subject" +
-                "&f=test:/exams/test" +
-                "&f=marks:/exams/marks");
+        runner.setProperty("split", "/exams");
+        runner.setProperty("f.1", "first:/first");
+        runner.setProperty("f.2", "last:/last");
+        runner.setProperty("f.3", "grade:/grade");
+        runner.setProperty("f.4", "subject:/exams/subject");
+        runner.setProperty("f.5", "test:/exams/test");
+        runner.setProperty("f.6", "marks:/exams/marks");
 
         try (FileInputStream fileIn = new 
FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) {
             runner.enqueue(fileIn);
@@ -134,8 +133,7 @@ public class TestPutSolrContentStream {
 
         final TestRunner runner = createDefaultTestRunner(proc);
         runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, 
"/update/csv");
-        runner.setProperty(PutSolrContentStream.REQUEST_PARAMS,
-                "fieldnames=first,last,grade,subject,test,marks");
+        runner.setProperty("fieldnames", 
"first,last,grade,subject,test,marks");
 
         try (FileInputStream fileIn = new 
FileInputStream(CSV_MULTIPLE_DOCS_FILE)) {
             runner.enqueue(fileIn);
@@ -222,7 +220,6 @@ public class TestPutSolrContentStream {
         }
     }
 
-
     @Test
     public void testSolrTypeCloudShouldRequireCollection() {
         final TestRunner runner = 
TestRunners.newTestRunner(PutSolrContentStream.class);
@@ -242,15 +239,6 @@ public class TestPutSolrContentStream {
         runner.assertValid();
     }
 
-    @Test
-    public void testRequestParamsShouldBeInvalid() {
-        final TestRunner runner = 
TestRunners.newTestRunner(PutSolrContentStream.class);
-        runner.setProperty(PutSolrContentStream.SOLR_TYPE, 
PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
-        runner.setProperty(PutSolrContentStream.SOLR_LOCATION, 
"http://localhost:8443/solr";);
-        runner.setProperty(PutSolrContentStream.REQUEST_PARAMS, "a=1&b");
-        runner.assertNotValid();
-    }
-
     /**
      * Override the creatrSolrServer method to inject a Mock.
      */

Reply via email to