Repository: nifi
Updated Branches:
  refs/heads/master fbd299e88 -> e27c2556d


NIFI-1594: Add support to expression language to define index operation

The index operation should be one of "index", "update", " upsert ".

This lets the operation could be defined by incoming flowfile.

Signed-off-by: João Henrique Ferreira de Freitas <joa...@gmail.com>
Signed-off-by: Matt Burgess <mattyb...@apache.org>

This closes #255


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e27c2556
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e27c2556
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e27c2556

Branch: refs/heads/master
Commit: e27c2556db17dea2b5adce8ed4291d92595fe6a7
Parents: fbd299e
Author: João Henrique Ferreira de Freitas <joa...@gmail.com>
Authored: Wed Feb 24 14:44:34 2016 -0300
Committer: Matt Burgess <mattyb...@apache.org>
Committed: Fri Jun 17 11:22:47 2016 -0400

----------------------------------------------------------------------
 .../elasticsearch/PutElasticsearch.java         | 28 ++++++++++++++++--
 .../elasticsearch/TestPutElasticsearch.java     | 30 ++++++++++++++++++++
 2 files changed, 56 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e27c2556/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
index 6452bc7..5075586 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
@@ -100,6 +100,16 @@ public class PutElasticsearch extends 
AbstractElasticsearchProcessor {
                     AttributeExpression.ResultType.STRING, true))
             .build();
 
+    public static final PropertyDescriptor INDEX_OP = new 
PropertyDescriptor.Builder()
+            .name("Index Operation")
+            .description("The type of the operation used to index (index, 
update, upsert)")
+            .required(true)
+            .expressionLanguageSupported(true)
+            
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(
+                    AttributeExpression.ResultType.STRING, true))
+            .defaultValue("index")
+            .build();
+
     public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
             .name("Batch Size")
             .description("The preferred number of FlowFiles to put to the 
database in a single transaction")
@@ -134,6 +144,7 @@ public class PutElasticsearch extends 
AbstractElasticsearchProcessor {
         descriptors.add(TYPE);
         descriptors.add(CHARSET);
         descriptors.add(BATCH_SIZE);
+        descriptors.add(INDEX_OP);
 
         return Collections.unmodifiableList(descriptors);
     }
@@ -166,6 +177,7 @@ public class PutElasticsearch extends 
AbstractElasticsearchProcessor {
             for (FlowFile file : flowFiles) {
                 final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue();
                 final String docType = 
context.getProperty(TYPE).evaluateAttributeExpressions(file).getValue();
+                final String indexOp = 
context.getProperty(INDEX_OP).evaluateAttributeExpressions(file).getValue();
 
                 final String id = file.getAttribute(id_attribute);
                 if (id == null) {
@@ -178,8 +190,20 @@ public class PutElasticsearch extends 
AbstractElasticsearchProcessor {
                         public void process(final InputStream in) throws 
IOException {
                             String json = IOUtils.toString(in, charset)
                                     .replace("\r\n", " ").replace('\n', ' 
').replace('\r', ' ');
-                            bulk.add(esClient.get().prepareIndex(index, 
docType, id)
-                                    .setSource(json.getBytes(charset)));
+
+                            if (indexOp.equalsIgnoreCase("index")) {
+                                bulk.add(esClient.get().prepareIndex(index, 
docType, id)
+                                        .setSource(json.getBytes(charset)));
+                            } else if (indexOp.equalsIgnoreCase("upsert")) {
+                                bulk.add(esClient.get().prepareUpdate(index, 
docType, id)
+                                        .setDoc(json.getBytes(charset))
+                                        .setDocAsUpsert(true));
+                            } else if (indexOp.equalsIgnoreCase("update")) {
+                                bulk.add(esClient.get().prepareUpdate(index, 
docType, id)
+                                        .setDoc(json.getBytes(charset)));
+                            } else {
+                                throw new IOException("Index operation: " + 
indexOp + " not supported.");
+                            }
                         }
                     });
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e27c2556/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
index fa2767b..ce25b81 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
@@ -248,6 +248,36 @@ public class TestPutElasticsearch {
         assertNotNull(out2);
     }
 
+    @Test
+    public void testPutElasticSearchOnTriggerWithInvalidIndexOp() throws 
IOException {
+        runner = TestRunners.newTestRunner(new 
PutElasticsearchTestProcessor(false)); // no failures
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch");
+        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
+        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
+        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+
+        runner.setProperty(PutElasticsearch.INDEX, "doc");
+        runner.assertNotValid();
+        runner.setProperty(PutElasticsearch.TYPE, "status");
+        runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
+        runner.assertNotValid();
+        runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
+        runner.assertValid();
+
+        runner.setProperty(PutElasticsearch.INDEX_OP, "index_fail");
+        runner.assertValid();
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_FAILURE, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(PutElasticsearch.REL_FAILURE).get(0);
+        assertNotNull(out);
+    }
+
     /**
      * A Test class that extends the processor in order to inject/mock behavior
      */

Reply via email to