Repository: nifi Updated Branches: refs/heads/master 8cf34c3ea -> 75af3a2eb
Initial commit for the elasticsearch bundle to Nifi Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e6cfcf40 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e6cfcf40 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e6cfcf40 Branch: refs/heads/master Commit: e6cfcf40d0bcafa720449e163b8981958662c2d1 Parents: 8cf34c3 Author: scarpacci <[email protected]> Authored: Tue Dec 22 10:10:45 2015 -0800 Committer: Matt Burgess <[email protected]> Committed: Tue Feb 2 17:26:39 2016 -0500 ---------------------------------------------------------------------- .../nifi-elasticsearch-nar/pom.xml | 24 ++ .../src/main/resources/META-INF/NOTICE.txt | 24 ++ .../nifi-elasticsearch-processors/pom.xml | 63 +++++ .../AbstractElasticSearchProcessor.java | 232 +++++++++++++++++++ .../elasticsearch/PutElasticSearch.java | 197 ++++++++++++++++ .../org.apache.nifi.processor.Processor | 15 ++ .../elasticsearch/TestPutElasticSearch.java | 103 ++++++++ .../src/test/resources/TweetExample.json | 83 +++++++ .../nifi-elasticsearch-bundle/pom.xml | 21 ++ 9 files changed, 762 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/e6cfcf40/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/pom.xml new file mode 100644 index 0000000..44492a5 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/pom.xml @@ -0,0 +1,24 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>nifi-elasticsearch-bundle</artifactId> + <groupId>org.apache.nifi</groupId> + <version>0.4.1-SNAPSHOT</version> + </parent> + + <groupId>gov.pnnl.nifi</groupId> + <artifactId>nifi-elasticsearch-nar</artifactId> + <packaging>nar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-elasticsearch-processors</artifactId> + <version>0.4.1-SNAPSHOT</version> + </dependency> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/e6cfcf40/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/src/main/resources/META-INF/NOTICE.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/src/main/resources/META-INF/NOTICE.txt b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/src/main/resources/META-INF/NOTICE.txt new file mode 100644 index 0000000..8c660af --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/src/main/resources/META-INF/NOTICE.txt @@ -0,0 +1,24 @@ +nifi-elasticsearch-nar +Copyright 2015 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2014 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + + (ASLv2) Apache Commons IO + The following NOTICE information applies: + Apache Commons IO + Copyright 2002-2012 The Apache Software Foundation http://git-wip-us.apache.org/repos/asf/nifi/blob/e6cfcf40/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml new file mode 100644 index 0000000..3208ced --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml @@ -0,0 +1,63 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>nifi-elasticsearch-bundle</artifactId> + <groupId>org.apache.nifi</groupId> + <version>0.4.1-SNAPSHOT</version> + </parent> + + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-elasticsearch-processors</artifactId> + + <properties> + <slf4jversion>1.7.12</slf4jversion> + <es.version>1.7.1</es.version> + <gsonversion>2.4</gsonversion> + <jodatimeversion>2.9.1</jodatimeversion> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4jversion}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <version>${slf4jversion}</version> + </dependency> + <dependency> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + <version>${es.version}</version> + </dependency> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>${gsonversion}</version> + </dependency> + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + <version>${jodatimeversion}</version> + </dependency> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/e6cfcf40/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticSearchProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticSearchProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticSearchProcessor.java new file mode 100644 index 0000000..6bfd37f --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticSearchProcessor.java @@ -0,0 +1,232 @@ +package org.apache.nifi.processors.elasticsearch; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; + +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; + +import com.google.gson.*; + +public abstract class AbstractElasticSearchProcessor extends AbstractProcessor{ + + protected static final PropertyDescriptor CLUSTER_NAME = new PropertyDescriptor.Builder() + .name("Cluster Name") + .description("Name of the ES cluster. For example, elasticsearch_brew") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + protected static final PropertyDescriptor HOSTS = new PropertyDescriptor.Builder() + .name("ElasticSearch Hosts") + .description("ElasticSearch Hosts, which should be comma separated and colon for hostname/port " + + "host1:port,host2:port,.... For example testcluster:9300") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + protected static final PropertyDescriptor PING_TIMEOUT = new PropertyDescriptor.Builder() + .name("ElasticSearch Ping Timeout") + .description("The ping timeout used to determine when a node is unreachable. " + + "For example, 5s (5 seconds). If non-local recommended is 30s") + .required(true) + .defaultValue("5s") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + protected static final PropertyDescriptor SAMPLER_INTERVAL = new PropertyDescriptor.Builder() + .name("Sampler Interval") + .description("Node sampler interval. For example, 5s (5 seconds) If non-local recommended is 30s") + .required(true) + .defaultValue("5s") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final PropertyDescriptor INDEX_STRATEGY = new PropertyDescriptor.Builder() + .name("Index Strategy") + .description("Pick the index strategy. Yearly, Monthly, Daily, Hourly") + .required(true) + .defaultValue("Monthly") + .allowableValues("Yearly", "Monthly", "Daily", "Hourly") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected TransportClient esClient; + protected List<InetSocketAddress> esHosts; + protected String indexPrefix; + protected static String indexStrategy; + + /** + * Instantiate ElasticSearch Client + * @param context + * @throws IOException + */ + @OnScheduled + public final void createClient(ProcessContext context) throws IOException { + if (esClient != null) { + closeClient(); + } + + getLogger().info("Creating ElasticSearch Client"); + + try { + + final String clusterName = context.getProperty(CLUSTER_NAME).toString(); + final String pingTimeout = context.getProperty(PING_TIMEOUT).toString(); + final String samplerInterval = context.getProperty(SAMPLER_INTERVAL).toString(); + indexStrategy = context.getProperty(INDEX_STRATEGY).toString(); + + //create new transport client + esClient = new TransportClient( + ImmutableSettings.builder() + .put("cluster.name", clusterName) + .put("client.transport.ping_timeout", pingTimeout) + .put("client.transport.nodes_sampler_interval", samplerInterval), + false); + + + final String hosts = context.getProperty(HOSTS).toString(); + esHosts = GetEsHosts(hosts); + + for (final InetSocketAddress host : esHosts) { + esClient.addTransportAddress(new InetSocketTransportAddress(host)); + } + } catch (Exception e) { + getLogger().error("Failed to schedule PutElasticSearch due to {}", new Object[] { e }, e); + throw e; + } + } + + /** + * Dispose of ElasticSearch client + */ + @OnStopped + public final void closeClient() { + if (esClient != null) { + getLogger().info("Closing ElasticSearch Client"); + esClient.close(); + esClient = null; + } + } + + /** + * Get the ElasticSearch hosts from the Nifi attribute + * @param hosts A comma separted list of ElasticSearch hosts + * @return List of InetSockeAddresses for the ES hosts + */ + private List<InetSocketAddress> GetEsHosts(String hosts){ + + final List<String> esList = Arrays.asList(hosts.split(",")); + List<InetSocketAddress> esHosts = new ArrayList<>(); + + for(String item : esList){ + + String[] addresses = item.split(":"); + final String hostName = addresses[0]; + final int port = Integer.parseInt(addresses[1]); + + esHosts.add(new InetSocketAddress(hostName, port)); + } + + return esHosts; + + } + + + /** + * Get ElasticSearch index for data + * @param input + * @return + */ + public String getIndex(final JsonObject input) { + + return extractIndexString(input); + } + + /** + * Get ElasticSearch Type + * @param input + * @return + */ + public String getType(final JsonObject input) { + return "status"; + } + + /** + * Get id for ElasticSearch + * @param input + * @return + */ + public String getId(final JsonObject input) { + + return input.get("id").getAsString(); + } + + /** + * Get Source for ElasticSearch + * @param input + * @return + */ + public byte[] getSource(final JsonObject input) { + String jsonString = input.toString(); + jsonString = jsonString.replace("\r\n", " ").replace('\n', ' ').replace('\r', ' '); + return jsonString.getBytes(StandardCharsets.UTF_8); + } + + /** + * Identify ElasticSearch index where data will land + * @param parsedJson + * @return + */ + private static String extractIndexString(final JsonObject parsedJson) { + final String extractedDate = "created_at"; + if(!parsedJson.has(extractedDate)) + throw new IllegalArgumentException("Message is missing " + extractedDate); + + final DateTimeFormatter format = + DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z yyyy").withLocale(Locale.ENGLISH); + + final String dateElement = parsedJson.get(extractedDate).getAsString(); + final DateTimeFormatter isoFormat = ISODateTimeFormat.dateTime(); + final DateTime dateTime = isoFormat.parseDateTime(format.parseDateTime(dateElement).toString()); + + final DateTimeFormatter dateFormat; + //Create ElasticSearch Index + switch (indexStrategy){ + + case "Yearly": + dateFormat = DateTimeFormat.forPattern("yyyy_MM"); + break; + case "Monthly": + dateFormat = DateTimeFormat.forPattern("yyyy_MM"); + break; + case "Daily": + dateFormat = DateTimeFormat.forPattern("yyyy_MM_dd"); + break; + case "Hourly": + dateFormat = DateTimeFormat.forPattern("yyyy_MM_dd_HH"); + break; + default: + throw new IllegalArgumentException("Invalid index strategy selected: " + indexStrategy); + + } + + //ElasticSearch indexes must be lowercase + final String strategy = indexStrategy.toLowerCase() + "_" + dateFormat.print(dateTime); + return strategy; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e6cfcf40/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticSearch.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticSearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticSearch.java new file mode 100644 index 0000000..9740d39 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticSearch.java @@ -0,0 +1,197 @@ +package org.apache.nifi.processors.elasticsearch; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.elasticsearch.AbstractElasticSearchProcessor; +import org.apache.nifi.stream.io.StreamUtils; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; + +import org.elasticsearch.client.transport.NoNodeAvailableException; +import org.elasticsearch.*; +import org.elasticsearch.transport.ReceiveTimeoutTransportException; + +import java.io.IOException; +import java.io.InputStream; +import java.util.*; + +import com.google.gson.*; + +@EventDriven +@Tags({"elasticsearch", "insert", "update", "write", "put"}) +@CapabilityDescription("Writes the contents of a FlowFile to ElasticSearch") +public class PutElasticSearch extends AbstractElasticSearchProcessor { + + static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("All FlowFiles that are written to ElasticSearch are routed to this relationship").build(); + + static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("All FlowFiles that cannot be written to ElasticSearch are routed to this relationship").build(); + + static final Relationship REL_RETRY = new Relationship.Builder() + .name("retry") + .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") + .build(); + + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The preferred number of FlowFiles to put to the database in a single transaction") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("100") + .build(); + + private final List<PropertyDescriptor> descriptors; + + private final Set<Relationship> relationships; + + public PutElasticSearch() { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(CLUSTER_NAME); + descriptors.add(HOSTS); + descriptors.add(PING_TIMEOUT); + descriptors.add(SAMPLER_INTERVAL); + descriptors.add(BATCH_SIZE); + descriptors.add(INDEX_STRATEGY); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + relationships.add(REL_RETRY); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + final List<FlowFile> flowFiles = session.get(batchSize); + if (flowFiles.isEmpty()) { + return; + } + + final ProcessorLog logger = getLogger(); + + try { + final BulkRequestBuilder bulk = GetEsBulkRequest(session, flowFiles); + final BulkResponse response = bulk.execute().actionGet(); + if (response.hasFailures()) { + for (final BulkItemResponse item : response.getItems()) { + final FlowFile flowFile = flowFiles.get(item.getItemId()); + if (item.isFailed()) { + logger.error("Failed to insert {} into ElasticSearch due to {}", + new Object[]{flowFile, item.getFailure()}, new Exception()); + session.transfer(flowFile, REL_FAILURE); + + } else { + session.transfer(flowFile, REL_SUCCESS); + + } + } + } else { + for (final FlowFile flowFile : flowFiles) { + session.transfer(flowFile, REL_SUCCESS); + } + } + + + } catch (NoNodeAvailableException nne) { + logger.error("Failed to insert {} into ElasticSearch No Node Available {}", new Object[]{nne}, nne); + for (final FlowFile flowFile : flowFiles) { + session.transfer(flowFile, REL_RETRY); + } + context.yield(); + + } catch (ElasticsearchTimeoutException ete) { + logger.error("Failed to insert {} into ElasticSearch Timeout to {}", new Object[]{ete}, ete); + for (final FlowFile flowFile : flowFiles) { + session.transfer(flowFile, REL_RETRY); + } + context.yield(); + + } catch (ReceiveTimeoutTransportException rtt) { + logger.error("Failed to insert {} into ElasticSearch ReceiveTimeoutTransportException to {}", new Object[]{rtt}, rtt); + for (final FlowFile flowFile : flowFiles) { + session.transfer(flowFile, REL_RETRY); + } + context.yield(); + + } catch (ElasticsearchParseException esp) { + logger.error("Failed to insert {} into ElasticSearch Parse Exception {}", new Object[]{esp}, esp); + + for (final FlowFile flowFile : flowFiles) { + session.transfer(flowFile, REL_FAILURE); + } + context.yield(); + + } catch (ElasticsearchException e) { + logger.error("Failed to insert {} into ElasticSearch due to {}", new Object[]{e}, e); + + for (final FlowFile flowFile : flowFiles) { + session.transfer(flowFile, REL_FAILURE); + } + context.yield(); + + } catch (Exception e) { + logger.error("Failed to insert {} into ElasticSearch due to {}", new Object[]{e}, e); + + for (final FlowFile flowFile : flowFiles) { + session.transfer(flowFile, REL_FAILURE); + } + context.yield(); + + } + } + + /** + * Get the ES bulk request for the session + * + * @param session ProcessSession + * @param flowFiles Flowfiles pulled off of the queue to batch in + * @return BulkeRequestBuilder + */ + private BulkRequestBuilder GetEsBulkRequest(final ProcessSession session, final List<FlowFile> flowFiles) { + + final BulkRequestBuilder bulk = esClient.prepareBulk(); + for (FlowFile file : flowFiles) { + final byte[] content = new byte[(int) file.getSize()]; + session.read(file, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, content, true); + + final String input = new String(content); + final JsonParser parser = new JsonParser(); + final JsonObject json = parser.parse(input).getAsJsonObject(); + bulk.add(esClient.prepareIndex(getIndex(json), getType(json), getId(json)) + .setSource(getSource(json))); + + } + + }); + + } + return bulk; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e6cfcf40/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..7c14a42 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.elasticsearch.PutElasticSearch http://git-wip-us.apache.org/repos/asf/nifi/blob/e6cfcf40/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticSearch.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticSearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticSearch.java new file mode 100644 index 0000000..c8b7ab6 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticSearch.java @@ -0,0 +1,103 @@ +package org.apache.nifi.processors.elasticsearch; + +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.*; + + +import java.io.IOException; +import java.io.InputStream; + +public class TestPutElasticSearch { + + private InputStream twitterExample; + private TestRunner runner; + + @Before + public void setUp() throws IOException { + ClassLoader classloader = Thread.currentThread().getContextClassLoader(); + twitterExample = classloader + .getResourceAsStream("TweetExample.json"); + + } + + @After + public void teardown() { + runner = null; + + } + + + @Test + @Ignore("Comment this out if you want to run against local or test ES") + public void testPutElasticSearchBasic() throws IOException { + System.out.println("Starting test " + new Object() { + }.getClass().getEnclosingMethod().getName()); + final TestRunner runner = TestRunners.newTestRunner(new PutElasticSearch()); + runner.setValidateExpressionUsage(false); + //Local Cluster - Mac pulled from brew + runner.setProperty(AbstractElasticSearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); + runner.setProperty(AbstractElasticSearchProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticSearchProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticSearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticSearchProcessor.INDEX_STRATEGY, "Monthly"); + runner.setProperty(PutElasticSearch.BATCH_SIZE, "1"); + + + runner.enqueue(twitterExample); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutElasticSearch.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticSearch.REL_SUCCESS).get(0); + + + } + + @Test + @Ignore("Comment this out if you want to run against local or test ES") + public void testPutElasticSearchBatch() throws IOException { + System.out.println("Starting test " + new Object() { + }.getClass().getEnclosingMethod().getName()); + final TestRunner runner = TestRunners.newTestRunner(new PutElasticSearch()); + runner.setValidateExpressionUsage(false); + //Local Cluster - Mac pulled from brew + runner.setProperty(AbstractElasticSearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); + runner.setProperty(AbstractElasticSearchProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticSearchProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticSearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticSearchProcessor.INDEX_STRATEGY, "Monthly"); + runner.setProperty(PutElasticSearch.BATCH_SIZE, "100"); + + JsonParser parser = new JsonParser(); + JsonObject json; + String message = convertStreamToString(twitterExample); + for (int i = 0;i < 100; i++){ + + json = parser.parse(message).getAsJsonObject(); + String id = json.get("id").getAsString(); + long newId = Long.parseLong(id) + i; + json.addProperty("id", newId); + runner.enqueue(message.getBytes()); + + } + + runner.run(); + + runner.assertAllFlowFilesTransferred(PutElasticSearch.REL_SUCCESS, 100); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticSearch.REL_SUCCESS).get(0); + + } + + /** + * Convert an input stream to a stream + * @param is input the input stream + * @return return the converted input stream as a string + */ + static String convertStreamToString(java.io.InputStream is) { + java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A"); + return s.hasNext() ? s.next() : ""; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e6cfcf40/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/TweetExample.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/TweetExample.json b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/TweetExample.json new file mode 100644 index 0000000..7375be6 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/TweetExample.json @@ -0,0 +1,83 @@ + +{ + "coordinates": null, + "created_at": "Thu Oct 21 16:02:46 +0000 2010", + "favorited": false, + "truncated": false, + "id_str": "28039652140", + "entities": { + "urls": [ + { + "expanded_url": null, + "url": "http://gnip.com/success_stories", + "indices": [ + 69, + 100 + ] + } + ], + "hashtags": [ + + ], + "user_mentions": [ + { + "name": "Gnip, Inc.", + "id_str": "16958875", + "id": 16958875, + "indices": [ + 25, + 30 + ], + "screen_name": "gnip" + } + ] + }, + "in_reply_to_user_id_str": null, + "text": "what we've been up to at @gnip -- delivering data to happy customers http://gnip.com/success_stories", + "contributors": null, + "id": 28039652140, + "retweet_count": null, + "in_reply_to_status_id_str": null, + "geo": null, + "retweeted": false, + "in_reply_to_user_id": null, + "user": { + "profile_sidebar_border_color": "C0DEED", + "name": "Gnip, Inc.", + "profile_sidebar_fill_color": "DDEEF6", + "profile_background_tile": false, + "profile_image_url": "http://a3.twimg.com/profile_images/62803643/icon_normal.png", + "location": "Boulder, CO", + "created_at": "Fri Oct 24 23:22:09 +0000 2008", + "id_str": "16958875", + "follow_request_sent": false, + "profile_link_color": "0084B4", + "favourites_count": 1, + "url": "http://blog.gnip.com", + "contributors_enabled": false, + "utc_offset": -25200, + "id": 16958875, + "profile_use_background_image": true, + "listed_count": 23, + "protected": false, + "lang": "en", + "profile_text_color": "333333", + "followers_count": 260, + "time_zone": "Mountain Time (US & Canada)", + "verified": false, + "geo_enabled": true, + "profile_background_color": "C0DEED", + "notifications": false, + "description": "Gnip makes it really easy for you to collect social data for your business.", + "friends_count": 71, + "profile_background_image_url": "http://s.twimg.com/a/1287010001/images/themes/theme1/bg.png", + "statuses_count": 302, + "screen_name": "gnip", + "following": false, + "show_all_inline_media": false + }, + "in_reply_to_screen_name": null, + "source": "web", + "place": null, + "in_reply_to_status_id": null +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/e6cfcf40/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml new file mode 100644 index 0000000..9945c3e --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml @@ -0,0 +1,21 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-bundles</artifactId> + <version>0.4.1-SNAPSHOT</version> + </parent> + + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-elasticsearch-bundle</artifactId> + <packaging>pom</packaging> + <modules> + <module>nifi-elasticsearch-nar</module> + <module>nifi-elasticsearch-processors</module> + </modules> + +</project> \ No newline at end of file
