Added path home property and unit test to elasticsearch processor in support of 
the node client


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0c137bc2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0c137bc2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0c137bc2

Branch: refs/heads/master
Commit: 0c137bc22d18b23b121e42de0dd03297fffa2b5f
Parents: 943d0a6
Author: scarpacci <[email protected]>
Authored: Wed Dec 30 09:27:49 2015 -0800
Committer: Matt Burgess <[email protected]>
Committed: Tue Feb 2 17:26:40 2016 -0500

----------------------------------------------------------------------
 .../AbstractElasticsearchProcessor.java         | 32 ++++++-
 .../elasticsearch/PutElasticsearch.java         |  1 +
 .../elasticsearch/TestPutElasticsearch.java     | 90 ++++++++++++++++++--
 3 files changed, 114 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0c137bc2/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java
index 9573178..71a116b 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java
@@ -81,6 +81,16 @@ public abstract class AbstractElasticsearchProcessor extends 
AbstractProcessor {
             .required(false)
             .addValidator(new ElasticsearchClientValidator())
             .build();
+
+    protected static final PropertyDescriptor PATH_HOME = new 
PropertyDescriptor.Builder()
+            .name("ElasticSearch Path Home")
+            .description("ElasticSearch node client requires that path.home be 
set. For example, "
+                        + "/usr/share/elasticsearch or 
/usr/local/opt/elasticsearch for homebrew intall "
+                        + 
"https://www.elastic.co/guide/en/elasticsearch/reference/current/setup-dir-layout.html";)
+            .required(false)
+            .addValidator(new ElasticsearchClientValidator())
+            .build();
+
     protected static final PropertyDescriptor PING_TIMEOUT = new 
PropertyDescriptor.Builder()
             .name("ElasticSearch Ping Timeout")
             .description("The ping timeout used to determine when a node is 
unreachable.  " +
@@ -89,6 +99,7 @@ public abstract class AbstractElasticsearchProcessor extends 
AbstractProcessor {
             .defaultValue("5s")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
+
     protected static final PropertyDescriptor SAMPLER_INTERVAL = new 
PropertyDescriptor.Builder()
             .name("Sampler Interval")
             .description("Node sampler interval. For example, 5s (5 seconds) 
If non-local recommended is 30s")
@@ -144,7 +155,14 @@ public abstract class AbstractElasticsearchProcessor 
extends AbstractProcessor {
                 }
                 esClient = transportClient;
             } else if ("node".equals(clusterType)) {
-                esClient = 
NodeBuilder.nodeBuilder().clusterName(clusterName).node().client();
+
+                final String pathHome = 
context.getProperty(PATH_HOME).toString();
+                //create new node client
+                Settings settings = Settings.settingsBuilder()
+                        .put("path.home", pathHome)
+                        .build();
+
+                esClient = 
NodeBuilder.nodeBuilder().clusterName(clusterName).settings(settings).node().client();
             }
         } catch (Exception e) {
             log.error("Failed to create Elasticsearch client due to {}", new 
Object[]{e}, e);
@@ -205,7 +223,7 @@ public abstract class AbstractElasticsearchProcessor 
extends AbstractProcessor {
     }
 
     /**
-     * A custom validator for the Elasticsearch properties list. For example, 
the hostnames property doesn't need to
+     * A custom validator for the ElasticSearch properties list. For example, 
the hostnames property doesn't need to
      * be filled in for a Node client, as it joins the cluster by name. 
Alternatively if a Transport client
      */
     protected static class ElasticsearchClientValidator implements Validator {
@@ -220,6 +238,16 @@ public abstract class AbstractElasticsearchProcessor 
extends AbstractProcessor {
                             CLIENT_TYPE.getName(), 
clientTypeProperty.getValue(), context);
                 }
             }
+
+            // Only validate Path home if client type == Node
+            if (PATH_HOME.getName().equals(subject)) {
+                PropertyValue clientTypeProperty = 
context.getProperty(CLIENT_TYPE);
+                if 
(NODE_CLIENT.getValue().equals(clientTypeProperty.getValue())) {
+                    return StandardValidators.NON_EMPTY_VALIDATOR.validate(
+                            CLIENT_TYPE.getName(), 
clientTypeProperty.getValue(), context);
+                }
+            }
+
             return VALID.validate(subject, input, context);
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0c137bc2/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
index 003aa30..43f3a6b 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
@@ -116,6 +116,7 @@ public class PutElasticsearch extends 
AbstractElasticsearchProcessor {
         descriptors.add(CLIENT_TYPE);
         descriptors.add(CLUSTER_NAME);
         descriptors.add(HOSTS);
+        descriptors.add(PATH_HOME);
         descriptors.add(PING_TIMEOUT);
         descriptors.add(SAMPLER_INTERVAL);
         descriptors.add(ID_ATTRIBUTE);

http://git-wip-us.apache.org/repos/asf/nifi/blob/0c137bc2/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
index 68a0a78..6af8fd2 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
@@ -21,8 +21,6 @@ import com.google.gson.JsonParser;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.MockProcessContext;
-import org.apache.nifi.util.MockProcessorInitializationContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.elasticsearch.action.ActionListener;
@@ -32,11 +30,8 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexAction;
 import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.action.support.AbstractListenableActionFuture;
 import org.elasticsearch.action.support.AdapterActionFuture;
-import org.elasticsearch.action.support.PlainListenableActionFuture;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.common.unit.TimeValue;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -125,6 +120,35 @@ public class TestPutElasticsearch {
         out.assertAttributeEquals("tweet_id", "28039652140");
     }
 
+    @Test
+    public void testPutElasticSearchOnTriggerNode() throws IOException {
+        runner = TestRunners.newTestRunner(new 
ElasticsearchTestProcessor(false)); // no failures
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE,"node");
+        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch_brew");
+        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
+        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
+        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+
+        runner.setProperty(PutElasticsearch.INDEX, "tweet");
+        runner.assertNotValid();
+        runner.setProperty(PutElasticsearch.TYPE, "status");
+        runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
+        runner.assertNotValid();
+        runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
+        runner.assertValid();
+
+        runner.enqueue(twitterExample, new HashMap<String, String>() {{
+            put("tweet_id", "28039652141");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("tweet_id", "28039652141");
+    }
+
     /**
      * A Test class that extends the processor in order to inject/mock behavior
      */
@@ -197,14 +221,56 @@ public class TestPutElasticsearch {
         }.getClass().getEnclosingMethod().getName());
         final TestRunner runner = TestRunners.newTestRunner(new 
PutElasticsearch());
         runner.setValidateExpressionUsage(false);
+
         //Local Cluster - Mac pulled from brew
+        runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE, 
"transport");
         runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch_brew");
         runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
         runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
         runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+
+        runner.setProperty(PutElasticsearch.INDEX, "tweet");
+        runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
+
+        runner.setProperty(PutElasticsearch.TYPE, "status");
+        runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
+        runner.assertValid();
+
+        runner.enqueue(twitterExample, new HashMap<String, String>() {{
+            put("tweet_id", "28039652140");
+        }});
+
+
+        runner.enqueue(twitterExample);
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
+
+    }
+
+    @Test
+    @Ignore("Comment this out if you want to run against local or test ES")
+    public void testPutElasticSearchBasicNode() throws IOException {
+        System.out.println("Starting test " + new Object() {
+        }.getClass().getEnclosingMethod().getName());
+        final TestRunner runner = TestRunners.newTestRunner(new 
PutElasticsearch());
+        runner.setValidateExpressionUsage(false);
+
+        //Local Cluster - Mac pulled from brew
+        runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE, "node");
+        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch_brew");
+        runner.setProperty(AbstractElasticsearchProcessor.PATH_HOME, 
"/usr/local/opt/elasticsearch");
         runner.setProperty(PutElasticsearch.INDEX, "tweet");
         runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
 
+        runner.setProperty(PutElasticsearch.TYPE, "status");
+        runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
+        runner.assertValid();
+
+        runner.enqueue(twitterExample, new HashMap<String, String>() {{
+            put("tweet_id", "28039652141");
+        }});
 
         runner.enqueue(twitterExample);
         runner.run(1, true, true);
@@ -221,7 +287,9 @@ public class TestPutElasticsearch {
         }.getClass().getEnclosingMethod().getName());
         final TestRunner runner = TestRunners.newTestRunner(new 
PutElasticsearch());
         runner.setValidateExpressionUsage(false);
+
         //Local Cluster - Mac pulled from brew
+        runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE, 
"transport");
         runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch_brew");
         runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
         runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
@@ -229,6 +297,11 @@ public class TestPutElasticsearch {
         runner.setProperty(PutElasticsearch.INDEX, "tweet");
         runner.setProperty(PutElasticsearch.BATCH_SIZE, "100");
 
+        runner.setProperty(PutElasticsearch.TYPE, "status");
+        runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
+        runner.assertValid();
+
+
         JsonParser parser = new JsonParser();
         JsonObject json;
         String message = convertStreamToString(twitterExample);
@@ -237,8 +310,11 @@ public class TestPutElasticsearch {
             json = parser.parse(message).getAsJsonObject();
             String id = json.get("id").getAsString();
             long newId = Long.parseLong(id) + i;
-            json.addProperty("id", newId);
-            runner.enqueue(message.getBytes());
+            final String newStrId = Long.toString(newId);
+            //json.addProperty("id", newId);
+            runner.enqueue(message.getBytes(), new HashMap<String, String>() {{
+                put("tweet_id", newStrId);
+            }});
 
         }
 

Reply via email to