Repository: nifi
Updated Branches:
  refs/heads/master 8cf34c3ea -> 75af3a2eb


Initial commit for the elasticsearch bundle to Nifi


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e6cfcf40
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e6cfcf40
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e6cfcf40

Branch: refs/heads/master
Commit: e6cfcf40d0bcafa720449e163b8981958662c2d1
Parents: 8cf34c3
Author: scarpacci <[email protected]>
Authored: Tue Dec 22 10:10:45 2015 -0800
Committer: Matt Burgess <[email protected]>
Committed: Tue Feb 2 17:26:39 2016 -0500

----------------------------------------------------------------------
 .../nifi-elasticsearch-nar/pom.xml              |  24 ++
 .../src/main/resources/META-INF/NOTICE.txt      |  24 ++
 .../nifi-elasticsearch-processors/pom.xml       |  63 +++++
 .../AbstractElasticSearchProcessor.java         | 232 +++++++++++++++++++
 .../elasticsearch/PutElasticSearch.java         | 197 ++++++++++++++++
 .../org.apache.nifi.processor.Processor         |  15 ++
 .../elasticsearch/TestPutElasticSearch.java     | 103 ++++++++
 .../src/test/resources/TweetExample.json        |  83 +++++++
 .../nifi-elasticsearch-bundle/pom.xml           |  21 ++
 9 files changed, 762 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e6cfcf40/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/pom.xml 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/pom.xml
new file mode 100644
index 0000000..44492a5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/pom.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>nifi-elasticsearch-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>0.4.1-SNAPSHOT</version>
+    </parent>
+
+    <groupId>gov.pnnl.nifi</groupId>
+    <artifactId>nifi-elasticsearch-nar</artifactId>
+    <packaging>nar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-elasticsearch-processors</artifactId>
+            <version>0.4.1-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/e6cfcf40/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/src/main/resources/META-INF/NOTICE.txt
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/src/main/resources/META-INF/NOTICE.txt
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/src/main/resources/META-INF/NOTICE.txt
new file mode 100644
index 0000000..8c660af
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/src/main/resources/META-INF/NOTICE.txt
@@ -0,0 +1,24 @@
+nifi-elasticsearch-nar
+Copyright 2015 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+******************
+Apache Software License v2
+******************
+
+The following binary components are provided under the Apache Software License 
v2
+
+  (ASLv2) Apache Commons Lang
+    The following NOTICE information applies:
+      Apache Commons Lang
+      Copyright 2001-2014 The Apache Software Foundation
+
+      This product includes software from the Spring Framework,
+      under the Apache License 2.0 (see: StringUtils.containsWhitespace())
+
+  (ASLv2) Apache Commons IO
+    The following NOTICE information applies:
+      Apache Commons IO
+      Copyright 2002-2012 The Apache Software Foundation

http://git-wip-us.apache.org/repos/asf/nifi/blob/e6cfcf40/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
new file mode 100644
index 0000000..3208ced
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>nifi-elasticsearch-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>0.4.1-SNAPSHOT</version>
+    </parent>
+
+    <groupId>org.apache.nifi</groupId>
+    <artifactId>nifi-elasticsearch-processors</artifactId>
+
+    <properties>
+        <slf4jversion>1.7.12</slf4jversion>
+        <es.version>1.7.1</es.version>
+        <gsonversion>2.4</gsonversion>
+        <jodatimeversion>2.9.1</jodatimeversion>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${slf4jversion}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <version>${slf4jversion}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>${es.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>${gsonversion}</version>
+        </dependency>
+        <dependency>
+            <groupId>joda-time</groupId>
+            <artifactId>joda-time</artifactId>
+            <version>${jodatimeversion}</version>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/e6cfcf40/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticSearchProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticSearchProcessor.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticSearchProcessor.java
new file mode 100644
index 0000000..6bfd37f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticSearchProcessor.java
@@ -0,0 +1,232 @@
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+
+import com.google.gson.*;
+
+public abstract class AbstractElasticSearchProcessor extends AbstractProcessor{
+
+    protected static final PropertyDescriptor CLUSTER_NAME = new 
PropertyDescriptor.Builder()
+            .name("Cluster Name")
+            .description("Name of the ES cluster. For example, 
elasticsearch_brew")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    protected static final PropertyDescriptor HOSTS = new 
PropertyDescriptor.Builder()
+            .name("ElasticSearch Hosts")
+            .description("ElasticSearch Hosts, which should be comma separated 
and colon for hostname/port " +
+                    "host1:port,host2:port,....  For example testcluster:9300")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    protected static final PropertyDescriptor PING_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("ElasticSearch Ping Timeout")
+            .description("The ping timeout used to determine when a node is 
unreachable.  " +
+                    "For example, 5s (5 seconds). If non-local recommended is 
30s")
+            .required(true)
+            .defaultValue("5s")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    protected static final PropertyDescriptor SAMPLER_INTERVAL = new 
PropertyDescriptor.Builder()
+            .name("Sampler Interval")
+            .description("Node sampler interval. For example, 5s (5 seconds) 
If non-local recommended is 30s")
+            .required(true)
+            .defaultValue("5s")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    protected static final PropertyDescriptor INDEX_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Index Strategy")
+            .description("Pick the index strategy. Yearly, Monthly, Daily, 
Hourly")
+            .required(true)
+            .defaultValue("Monthly")
+            .allowableValues("Yearly", "Monthly", "Daily", "Hourly")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    protected TransportClient esClient;
+    protected List<InetSocketAddress> esHosts;
+    protected String indexPrefix;
+    protected static String indexStrategy;
+
+    /**
+     * Instantiate ElasticSearch Client
+     * @param context
+     * @throws IOException
+     */
+    @OnScheduled
+    public final void createClient(ProcessContext context) throws IOException {
+        if (esClient != null) {
+            closeClient();
+        }
+
+        getLogger().info("Creating ElasticSearch Client");
+
+        try {
+
+            final String clusterName = 
context.getProperty(CLUSTER_NAME).toString();
+            final String pingTimeout = 
context.getProperty(PING_TIMEOUT).toString();
+            final String samplerInterval = 
context.getProperty(SAMPLER_INTERVAL).toString();
+            indexStrategy = context.getProperty(INDEX_STRATEGY).toString();
+
+            //create new transport client
+            esClient = new TransportClient(
+                    ImmutableSettings.builder()
+                            .put("cluster.name", clusterName)
+                            .put("client.transport.ping_timeout", pingTimeout)
+                            .put("client.transport.nodes_sampler_interval", 
samplerInterval),
+                    false);
+
+
+            final String hosts = context.getProperty(HOSTS).toString();
+            esHosts = GetEsHosts(hosts);
+
+            for (final InetSocketAddress host : esHosts) {
+                esClient.addTransportAddress(new 
InetSocketTransportAddress(host));
+            }
+        } catch (Exception e) {
+            getLogger().error("Failed to schedule PutElasticSearch due to {}", 
new Object[] { e }, e);
+            throw e;
+        }
+    }
+
+    /**
+     * Dispose of ElasticSearch client
+     */
+    @OnStopped
+    public final void closeClient() {
+        if (esClient != null) {
+            getLogger().info("Closing ElasticSearch Client");
+            esClient.close();
+            esClient = null;
+        }
+    }
+
+    /**
+     * Get the ElasticSearch hosts from the Nifi attribute
+     * @param hosts A comma separted list of ElasticSearch hosts
+     * @return List of InetSockeAddresses for the ES hosts
+     */
+    private List<InetSocketAddress> GetEsHosts(String hosts){
+
+        final List<String> esList = Arrays.asList(hosts.split(","));
+        List<InetSocketAddress> esHosts = new ArrayList<>();
+
+        for(String item : esList){
+
+            String[] addresses = item.split(":");
+            final String hostName = addresses[0];
+            final int port = Integer.parseInt(addresses[1]);
+
+            esHosts.add(new InetSocketAddress(hostName, port));
+        }
+
+        return esHosts;
+
+    }
+
+
+    /**
+     * Get ElasticSearch index for data
+     * @param input
+     * @return
+     */
+    public String getIndex(final JsonObject input) {
+
+        return extractIndexString(input);
+    }
+
+    /**
+     * Get ElasticSearch Type
+     * @param input
+     * @return
+     */
+    public String getType(final JsonObject input) {
+        return "status";
+    }
+
+    /**
+     * Get id for ElasticSearch
+     * @param input
+     * @return
+     */
+    public String getId(final JsonObject input) {
+
+        return input.get("id").getAsString();
+    }
+
+    /**
+     * Get Source for ElasticSearch
+     * @param input
+     * @return
+     */
+    public byte[] getSource(final JsonObject input) {
+        String jsonString = input.toString();
+        jsonString = jsonString.replace("\r\n", " ").replace('\n', ' 
').replace('\r', ' ');
+        return jsonString.getBytes(StandardCharsets.UTF_8);
+    }
+
+    /**
+     * Identify ElasticSearch index where data will land
+     * @param parsedJson
+     * @return
+     */
+    private static String extractIndexString(final JsonObject parsedJson) {
+        final String extractedDate = "created_at";
+        if(!parsedJson.has(extractedDate))
+            throw new IllegalArgumentException("Message is missing " + 
extractedDate);
+
+        final DateTimeFormatter format =
+                DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z 
yyyy").withLocale(Locale.ENGLISH);
+
+        final String dateElement = parsedJson.get(extractedDate).getAsString();
+        final DateTimeFormatter isoFormat = ISODateTimeFormat.dateTime();
+        final DateTime dateTime = 
isoFormat.parseDateTime(format.parseDateTime(dateElement).toString());
+
+        final DateTimeFormatter dateFormat;
+        //Create ElasticSearch Index
+        switch (indexStrategy){
+
+            case "Yearly":
+                dateFormat = DateTimeFormat.forPattern("yyyy_MM");
+                break;
+            case "Monthly":
+                dateFormat = DateTimeFormat.forPattern("yyyy_MM");
+                break;
+            case "Daily":
+                dateFormat = DateTimeFormat.forPattern("yyyy_MM_dd");
+                break;
+            case "Hourly":
+                dateFormat = DateTimeFormat.forPattern("yyyy_MM_dd_HH");
+                break;
+            default:
+                throw new IllegalArgumentException("Invalid index strategy 
selected: " + indexStrategy);
+
+        }
+
+        //ElasticSearch indexes must be lowercase
+        final String strategy = indexStrategy.toLowerCase() + "_" + 
dateFormat.print(dateTime);
+        return strategy;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e6cfcf40/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticSearch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticSearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticSearch.java
new file mode 100644
index 0000000..9740d39
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticSearch.java
@@ -0,0 +1,197 @@
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticSearchProcessor;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+
+import org.elasticsearch.client.transport.NoNodeAvailableException;
+import org.elasticsearch.*;
+import org.elasticsearch.transport.ReceiveTimeoutTransportException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.*;
+
+import com.google.gson.*;
+
+@EventDriven
+@Tags({"elasticsearch", "insert", "update", "write", "put"})
+@CapabilityDescription("Writes the contents of a FlowFile to ElasticSearch")
+public class PutElasticSearch extends AbstractElasticSearchProcessor {
+
+    static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+            .description("All FlowFiles that are written to ElasticSearch are 
routed to this relationship").build();
+
+    static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+            .description("All FlowFiles that cannot be written to 
ElasticSearch are routed to this relationship").build();
+
+    static final Relationship REL_RETRY = new Relationship.Builder()
+            .name("retry")
+            .description("A FlowFile is routed to this relationship if the 
database cannot be updated but attempting the operation again may succeed")
+            .build();
+
+    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Batch Size")
+            .description("The preferred number of FlowFiles to put to the 
database in a single transaction")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("100")
+            .build();
+
+    private final List<PropertyDescriptor> descriptors;
+
+    private final Set<Relationship> relationships;
+
+    public PutElasticSearch() {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(CLUSTER_NAME);
+        descriptors.add(HOSTS);
+        descriptors.add(PING_TIMEOUT);
+        descriptors.add(SAMPLER_INTERVAL);
+        descriptors.add(BATCH_SIZE);
+        descriptors.add(INDEX_STRATEGY);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_RETRY);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+        final List<FlowFile> flowFiles = session.get(batchSize);
+        if (flowFiles.isEmpty()) {
+            return;
+        }
+
+        final ProcessorLog logger = getLogger();
+
+        try {
+            final BulkRequestBuilder bulk = GetEsBulkRequest(session, 
flowFiles);
+            final BulkResponse response = bulk.execute().actionGet();
+            if (response.hasFailures()) {
+                for (final BulkItemResponse item : response.getItems()) {
+                    final FlowFile flowFile = flowFiles.get(item.getItemId());
+                    if (item.isFailed()) {
+                        logger.error("Failed to insert {} into ElasticSearch 
due to {}",
+                                new Object[]{flowFile, item.getFailure()}, new 
Exception());
+                        session.transfer(flowFile, REL_FAILURE);
+
+                    } else {
+                        session.transfer(flowFile, REL_SUCCESS);
+
+                    }
+                }
+            } else {
+                for (final FlowFile flowFile : flowFiles) {
+                    session.transfer(flowFile, REL_SUCCESS);
+                }
+            }
+
+
+        } catch (NoNodeAvailableException nne) {
+            logger.error("Failed to insert {} into ElasticSearch No Node 
Available {}", new Object[]{nne}, nne);
+            for (final FlowFile flowFile : flowFiles) {
+                session.transfer(flowFile, REL_RETRY);
+            }
+            context.yield();
+
+        } catch (ElasticsearchTimeoutException ete) {
+            logger.error("Failed to insert {} into ElasticSearch Timeout to 
{}", new Object[]{ete}, ete);
+            for (final FlowFile flowFile : flowFiles) {
+                session.transfer(flowFile, REL_RETRY);
+            }
+            context.yield();
+
+        } catch (ReceiveTimeoutTransportException rtt) {
+            logger.error("Failed to insert {} into ElasticSearch 
ReceiveTimeoutTransportException to {}", new Object[]{rtt}, rtt);
+            for (final FlowFile flowFile : flowFiles) {
+                session.transfer(flowFile, REL_RETRY);
+            }
+            context.yield();
+
+        } catch (ElasticsearchParseException esp) {
+            logger.error("Failed to insert {} into ElasticSearch Parse 
Exception {}", new Object[]{esp}, esp);
+
+            for (final FlowFile flowFile : flowFiles) {
+                session.transfer(flowFile, REL_FAILURE);
+            }
+            context.yield();
+
+        } catch (ElasticsearchException e) {
+            logger.error("Failed to insert {} into ElasticSearch due to {}", 
new Object[]{e}, e);
+
+            for (final FlowFile flowFile : flowFiles) {
+                session.transfer(flowFile, REL_FAILURE);
+            }
+            context.yield();
+
+        } catch (Exception e) {
+            logger.error("Failed to insert {} into ElasticSearch due to {}", 
new Object[]{e}, e);
+
+            for (final FlowFile flowFile : flowFiles) {
+                session.transfer(flowFile, REL_FAILURE);
+            }
+            context.yield();
+
+        }
+    }
+
+    /**
+     * Get the ES bulk request for the session
+     *
+     * @param session   ProcessSession
+     * @param flowFiles Flowfiles pulled off of the queue to batch in
+     * @return BulkeRequestBuilder
+     */
+    private BulkRequestBuilder GetEsBulkRequest(final ProcessSession session, 
final List<FlowFile> flowFiles) {
+
+        final BulkRequestBuilder bulk = esClient.prepareBulk();
+        for (FlowFile file : flowFiles) {
+            final byte[] content = new byte[(int) file.getSize()];
+            session.read(file, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    StreamUtils.fillBuffer(in, content, true);
+
+                    final String input = new String(content);
+                    final JsonParser parser = new JsonParser();
+                    final JsonObject json = 
parser.parse(input).getAsJsonObject();
+                    bulk.add(esClient.prepareIndex(getIndex(json), 
getType(json), getId(json))
+                            .setSource(getSource(json)));
+
+                }
+
+            });
+
+        }
+        return bulk;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e6cfcf40/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..7c14a42
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.elasticsearch.PutElasticSearch

http://git-wip-us.apache.org/repos/asf/nifi/blob/e6cfcf40/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticSearch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticSearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticSearch.java
new file mode 100644
index 0000000..c8b7ab6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticSearch.java
@@ -0,0 +1,103 @@
+package org.apache.nifi.processors.elasticsearch;
+
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.*;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class TestPutElasticSearch {
+
+    private InputStream twitterExample;
+    private TestRunner runner;
+
+    @Before
+    public void setUp() throws IOException {
+        ClassLoader classloader = 
Thread.currentThread().getContextClassLoader();
+        twitterExample = classloader
+                .getResourceAsStream("TweetExample.json");
+
+    }
+
+    @After
+    public void teardown() {
+        runner = null;
+
+    }
+
+
+    @Test
+    @Ignore("Comment this out if you want to run against local or test ES")
+    public void testPutElasticSearchBasic() throws IOException {
+        System.out.println("Starting test " + new Object() {
+        }.getClass().getEnclosingMethod().getName());
+        final TestRunner runner = TestRunners.newTestRunner(new 
PutElasticSearch());
+        runner.setValidateExpressionUsage(false);
+        //Local Cluster - Mac pulled from brew
+        runner.setProperty(AbstractElasticSearchProcessor.CLUSTER_NAME, 
"elasticsearch_brew");
+        runner.setProperty(AbstractElasticSearchProcessor.HOSTS, 
"127.0.0.1:9300");
+        runner.setProperty(AbstractElasticSearchProcessor.PING_TIMEOUT, "5s");
+        runner.setProperty(AbstractElasticSearchProcessor.SAMPLER_INTERVAL, 
"5s");
+        runner.setProperty(AbstractElasticSearchProcessor.INDEX_STRATEGY, 
"Monthly");
+        runner.setProperty(PutElasticSearch.BATCH_SIZE, "1");
+
+
+        runner.enqueue(twitterExample);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutElasticSearch.REL_SUCCESS, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(PutElasticSearch.REL_SUCCESS).get(0);
+
+
+    }
+
+    @Test
+    @Ignore("Comment this out if you want to run against local or test ES")
+    public void testPutElasticSearchBatch() throws IOException {
+        System.out.println("Starting test " + new Object() {
+        }.getClass().getEnclosingMethod().getName());
+        final TestRunner runner = TestRunners.newTestRunner(new 
PutElasticSearch());
+        runner.setValidateExpressionUsage(false);
+        //Local Cluster - Mac pulled from brew
+        runner.setProperty(AbstractElasticSearchProcessor.CLUSTER_NAME, 
"elasticsearch_brew");
+        runner.setProperty(AbstractElasticSearchProcessor.HOSTS, 
"127.0.0.1:9300");
+        runner.setProperty(AbstractElasticSearchProcessor.PING_TIMEOUT, "5s");
+        runner.setProperty(AbstractElasticSearchProcessor.SAMPLER_INTERVAL, 
"5s");
+        runner.setProperty(AbstractElasticSearchProcessor.INDEX_STRATEGY, 
"Monthly");
+        runner.setProperty(PutElasticSearch.BATCH_SIZE, "100");
+
+        JsonParser parser = new JsonParser();
+        JsonObject json;
+        String message = convertStreamToString(twitterExample);
+        for (int i = 0;i < 100; i++){
+
+            json = parser.parse(message).getAsJsonObject();
+            String id = json.get("id").getAsString();
+            long newId = Long.parseLong(id) + i;
+            json.addProperty("id", newId);
+            runner.enqueue(message.getBytes());
+
+        }
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutElasticSearch.REL_SUCCESS, 
100);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(PutElasticSearch.REL_SUCCESS).get(0);
+
+    }
+
+    /**
+     * Convert an input stream to a stream
+     * @param is input the input stream
+     * @return return the converted input stream as a string
+     */
+    static String convertStreamToString(java.io.InputStream is) {
+        java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A");
+        return s.hasNext() ? s.next() : "";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e6cfcf40/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/TweetExample.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/TweetExample.json
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/TweetExample.json
new file mode 100644
index 0000000..7375be6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/TweetExample.json
@@ -0,0 +1,83 @@
+
+{
+  "coordinates": null,
+  "created_at": "Thu Oct 21 16:02:46 +0000 2010",
+  "favorited": false,
+  "truncated": false,
+  "id_str": "28039652140",
+  "entities": {
+    "urls": [
+      {
+        "expanded_url": null,
+        "url": "http://gnip.com/success_stories";,
+        "indices": [
+          69,
+          100
+        ]
+      }
+    ],
+    "hashtags": [
+
+    ],
+    "user_mentions": [
+      {
+        "name": "Gnip, Inc.",
+        "id_str": "16958875",
+        "id": 16958875,
+        "indices": [
+          25,
+          30
+        ],
+        "screen_name": "gnip"
+      }
+    ]
+  },
+  "in_reply_to_user_id_str": null,
+  "text": "what we've been up to at @gnip -- delivering data to happy 
customers http://gnip.com/success_stories";,
+  "contributors": null,
+  "id": 28039652140,
+  "retweet_count": null,
+  "in_reply_to_status_id_str": null,
+  "geo": null,
+  "retweeted": false,
+  "in_reply_to_user_id": null,
+  "user": {
+    "profile_sidebar_border_color": "C0DEED",
+    "name": "Gnip, Inc.",
+    "profile_sidebar_fill_color": "DDEEF6",
+    "profile_background_tile": false,
+    "profile_image_url": 
"http://a3.twimg.com/profile_images/62803643/icon_normal.png";,
+    "location": "Boulder, CO",
+    "created_at": "Fri Oct 24 23:22:09 +0000 2008",
+    "id_str": "16958875",
+    "follow_request_sent": false,
+    "profile_link_color": "0084B4",
+    "favourites_count": 1,
+    "url": "http://blog.gnip.com";,
+    "contributors_enabled": false,
+    "utc_offset": -25200,
+    "id": 16958875,
+    "profile_use_background_image": true,
+    "listed_count": 23,
+    "protected": false,
+    "lang": "en",
+    "profile_text_color": "333333",
+    "followers_count": 260,
+    "time_zone": "Mountain Time (US & Canada)",
+    "verified": false,
+    "geo_enabled": true,
+    "profile_background_color": "C0DEED",
+    "notifications": false,
+    "description": "Gnip makes it really easy for you to collect social data 
for your business.",
+    "friends_count": 71,
+    "profile_background_image_url": 
"http://s.twimg.com/a/1287010001/images/themes/theme1/bg.png";,
+    "statuses_count": 302,
+    "screen_name": "gnip",
+    "following": false,
+    "show_all_inline_media": false
+  },
+  "in_reply_to_screen_name": null,
+  "source": "web",
+  "place": null,
+  "in_reply_to_status_id": null
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/e6cfcf40/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
new file mode 100644
index 0000000..9945c3e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>0.4.1-SNAPSHOT</version>
+    </parent>
+
+    <groupId>org.apache.nifi</groupId>
+    <artifactId>nifi-elasticsearch-bundle</artifactId>
+    <packaging>pom</packaging>
+    <modules>
+        <module>nifi-elasticsearch-nar</module>
+        <module>nifi-elasticsearch-processors</module>
+    </modules>
+
+</project>
\ No newline at end of file

Reply via email to