http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java new file mode 100644 index 0000000..6344e2c --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.solr; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StopWatch; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.util.ClientUtils; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrDocumentList; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Properties; +import java.util.Set; +import java.util.TimeZone; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +@Tags({"Apache", "Solr", "Get", "Pull"}) +@CapabilityDescription("Queries Solr and outputs the results as a FlowFile") +public class GetSolr extends SolrProcessor { + + public static final PropertyDescriptor SOLR_QUERY = new PropertyDescriptor + .Builder().name("Solr Query") + .description("A query to execute against Solr") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor RETURN_FIELDS = new PropertyDescriptor + .Builder().name("Return Fields") + .description("Comma-separated list of fields names to return") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor SORT_CLAUSE = new PropertyDescriptor + .Builder().name("Sort Clause") + .description("A Solr sort clause, ex: field1 asc, field2 desc") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor DATE_FIELD = new PropertyDescriptor + .Builder().name("Date Field") + .description("The name of a date field in Solr used to filter results") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor + .Builder().name("Batch Size") + .description("Number of rows per Solr query") + .required(true) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .defaultValue("100") + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The results of querying Solr") + .build(); + + static final String FILE_PREFIX = "conf/.getSolr-"; + static final String LAST_END_DATE = "LastEndDate"; + static final String LAST_END_DATE_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + static final String UNINITIALIZED_LAST_END_DATE_VALUE; + + static { + SimpleDateFormat sdf = new SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US); + sdf.setTimeZone(TimeZone.getTimeZone("GMT")); + UNINITIALIZED_LAST_END_DATE_VALUE = sdf.format(new Date(1L)); + } + + final AtomicReference<String> lastEndDatedRef = new AtomicReference<>(UNINITIALIZED_LAST_END_DATE_VALUE); + + private Set<Relationship> relationships; + private List<PropertyDescriptor> descriptors; + private final Lock fileLock = new ReentrantLock(); + + @Override + protected void init(final ProcessorInitializationContext context) { + super.init(context); + + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(SOLR_TYPE); + descriptors.add(SOLR_LOCATION); + descriptors.add(COLLECTION); + descriptors.add(SOLR_QUERY); + descriptors.add(RETURN_FIELDS); + descriptors.add(SORT_CLAUSE); + descriptors.add(DATE_FIELD); + descriptors.add(BATCH_SIZE); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return this.descriptors; + } + + @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE); + } + + @OnShutdown + public void onShutdown() { + writeLastEndDate(); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final ProcessorLog logger = getLogger(); + readLastEndDate(); + + final SimpleDateFormat sdf = new SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US); + sdf.setTimeZone(TimeZone.getTimeZone("GMT")); + final String currDate = sdf.format(new Date()); + + final boolean initialized = !UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get()); + + final String query = context.getProperty(SOLR_QUERY).getValue(); + final SolrQuery solrQuery = new SolrQuery(query); + solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger()); + + // if initialized then apply a filter to restrict results from the last end time til now + if (initialized) { + StringBuilder filterQuery = new StringBuilder(); + filterQuery.append(context.getProperty(DATE_FIELD).getValue()) + .append(":{").append(lastEndDatedRef.get()).append(" TO ") + .append(currDate).append("]"); + solrQuery.addFilterQuery(filterQuery.toString()); + logger.info("Applying filter query {}", new Object[]{filterQuery.toString()}); + } + + final String returnFields = context.getProperty(RETURN_FIELDS).getValue(); + if (returnFields != null && !returnFields.trim().isEmpty()) { + for (String returnField : returnFields.trim().split("[,]")) { + solrQuery.addField(returnField.trim()); + } + } + + final String fullSortClause = context.getProperty(SORT_CLAUSE).getValue(); + if (fullSortClause != null && !fullSortClause.trim().isEmpty()) { + for (String sortClause : fullSortClause.split("[,]")) { + String[] sortParts = sortClause.trim().split("[ ]"); + solrQuery.addSort(sortParts[0], SolrQuery.ORDER.valueOf(sortParts[1])); + } + } + + try { + // run the initial query and send out the first page of results + final StopWatch stopWatch = new StopWatch(true); + QueryResponse response = getSolrClient().query(solrQuery); + stopWatch.stop(); + + long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS); + + final SolrDocumentList documentList = response.getResults(); + logger.info("Retrieved {} results from Solr for {} in {} ms", + new Object[] {documentList.getNumFound(), query, duration}); + + if (documentList != null && documentList.getNumFound() > 0) { + FlowFile flowFile = session.create(); + flowFile = session.write(flowFile, new QueryResponseOutputStreamCallback(response)); + session.transfer(flowFile, REL_SUCCESS); + + StringBuilder transitUri = new StringBuilder("solr://"); + transitUri.append(context.getProperty(SOLR_LOCATION).getValue()); + if (SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) { + transitUri.append("/").append(context.getProperty(COLLECTION).getValue()); + } + + session.getProvenanceReporter().receive(flowFile, transitUri.toString(), duration); + + // if initialized then page through the results and send out each page + if (initialized) { + int endRow = response.getResults().size(); + long totalResults = response.getResults().getNumFound(); + + while (endRow < totalResults) { + solrQuery.setStart(endRow); + + stopWatch.start(); + response = getSolrClient().query(solrQuery); + stopWatch.stop(); + + duration = stopWatch.getDuration(TimeUnit.MILLISECONDS); + logger.info("Retrieved results for {} in {} ms", new Object[]{query, duration}); + + flowFile = session.create(); + flowFile = session.write(flowFile, new QueryResponseOutputStreamCallback(response)); + session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().receive(flowFile, transitUri.toString(), duration); + endRow += response.getResults().size(); + } + } + } + + lastEndDatedRef.set(currDate); + writeLastEndDate(); + } catch (SolrServerException | IOException e) { + context.yield(); + session.rollback(); + logger.error("Failed to execute query {} due to {}", new Object[]{query, e}, e); + throw new ProcessException(e); + } catch (final Throwable t) { + context.yield(); + session.rollback(); + logger.error("Failed to execute query {} due to {}", new Object[]{query, t}, t); + throw t; + } + } + + private void readLastEndDate() { + fileLock.lock(); + File lastEndDateCache = new File(FILE_PREFIX + getIdentifier()); + try (FileInputStream fis = new FileInputStream(lastEndDateCache)) { + Properties props = new Properties(); + props.load(fis); + lastEndDatedRef.set(props.getProperty(LAST_END_DATE)); + } catch (IOException swallow) { + } finally { + fileLock.unlock(); + } + } + + private void writeLastEndDate() { + fileLock.lock(); + File lastEndDateCache = new File(FILE_PREFIX + getIdentifier()); + try (FileOutputStream fos = new FileOutputStream(lastEndDateCache)) { + Properties props = new Properties(); + props.setProperty(LAST_END_DATE, lastEndDatedRef.get()); + props.store(fos, "GetSolr LastEndDate value"); + } catch (IOException e) { + getLogger().error("Failed to persist LastEndDate due to " + e, e); + } finally { + fileLock.unlock(); + } + } + + /** + * Writes each SolrDocument in XML format to the OutputStream. + */ + private class QueryResponseOutputStreamCallback implements OutputStreamCallback { + private QueryResponse response; + + public QueryResponseOutputStreamCallback(QueryResponse response) { + this.response = response; + } + + @Override + public void process(OutputStream out) throws IOException { + for (SolrDocument doc : response.getResults()) { + String xml = ClientUtils.toXML(ClientUtils.toSolrInputDocument(doc)); + IOUtils.write(xml, out); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java new file mode 100644 index 0000000..6eb287b --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.solr; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.ObjectHolder; +import org.apache.nifi.util.StopWatch; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest; +import org.apache.solr.client.solrj.response.UpdateResponse; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.MultiMapSolrParams; +import org.apache.solr.common.util.ContentStreamBase; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +@Tags({"Apache", "Solr", "Put", "Send"}) +@CapabilityDescription("Sends the contents of a FlowFile as a ContentStream to Solr") +@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value", + description="These parameters will be passed to Solr on the request") +public class PutSolrContentStream extends SolrProcessor { + + public static final PropertyDescriptor CONTENT_STREAM_PATH = new PropertyDescriptor + .Builder().name("Content Stream Path") + .description("The path in Solr to post the ContentStream") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .defaultValue("/update/json/docs") + .build(); + + public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor + .Builder().name("Content-Type") + .description("Content-Type being sent to Solr") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .defaultValue("application/json") + .build(); + + public static final PropertyDescriptor COMMIT_WITHIN = new PropertyDescriptor + .Builder().name("Commit Within") + .description("The number of milliseconds before the given update is committed") + .required(false) + .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The original FlowFile") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles that failed for any reason other than Solr being unreachable") + .build(); + + public static final Relationship REL_CONNECTION_FAILURE = new Relationship.Builder() + .name("connection_failure") + .description("FlowFiles that failed because Solr is unreachable") + .build(); + + public static final String COLLECTION_PARAM_NAME = "collection"; + public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin"; + public static final String REPEATING_PARAM_PATTERN = "\\w+\\.\\d+"; + + private Set<Relationship> relationships; + private List<PropertyDescriptor> descriptors; + + @Override + protected void init(final ProcessorInitializationContext context) { + super.init(context); + + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(SOLR_TYPE); + descriptors.add(SOLR_LOCATION); + descriptors.add(COLLECTION); + descriptors.add(CONTENT_STREAM_PATH); + descriptors.add(CONTENT_TYPE); + descriptors.add(COMMIT_WITHIN); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + relationships.add(REL_CONNECTION_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return this.descriptors; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .description("Specifies the value to send for the '" + propertyDescriptorName + "' request parameter") + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamic(true) + .expressionLanguageSupported(true) + .build(); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if ( flowFile == null ) { + return; + } + + final ObjectHolder<Exception> error = new ObjectHolder<>(null); + final ObjectHolder<Exception> connectionError = new ObjectHolder<>(null); + + final boolean isSolrCloud = SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue()); + final String collection = context.getProperty(COLLECTION_PARAM_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong(); + + final MultiMapSolrParams requestParams = new MultiMapSolrParams(getRequestParams(context, flowFile)); + + StopWatch timer = new StopWatch(true); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + final String contentStreamPath = context.getProperty(CONTENT_STREAM_PATH) + .evaluateAttributeExpressions().getValue(); + + ContentStreamUpdateRequest request = new ContentStreamUpdateRequest(contentStreamPath); + request.setParams(new ModifiableSolrParams()); + + // add the extra params, don't use 'set' in case of repeating params + Iterator<String> paramNames = requestParams.getParameterNamesIterator(); + while (paramNames.hasNext()) { + String paramName = paramNames.next(); + for (String paramValue : requestParams.getParams(paramName)) { + request.getParams().add(paramName, paramValue); + } + } + + // specify the collection for SolrCloud + if (isSolrCloud) { + request.setParam(COLLECTION_PARAM_NAME, collection); + } + + if (commitWithin != null && commitWithin > 0) { + request.setParam(COMMIT_WITHIN_PARAM_NAME, commitWithin.toString()); + } + + try (final BufferedInputStream bufferedIn = new BufferedInputStream(in)) { + // add the FlowFile's content on the UpdateRequest + request.addContentStream(new ContentStreamBase() { + @Override + public InputStream getStream() throws IOException { + return bufferedIn; + } + + @Override + public String getContentType() { + return context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions().getValue(); + } + }); + + UpdateResponse response = request.process(getSolrClient()); + getLogger().debug("Got {} response from Solr", new Object[]{response.getStatus()}); + } catch (SolrException e) { + error.set(e); + } catch (SolrServerException e) { + if (causedByIOException(e)) { + connectionError.set(e); + } else { + error.set(e); + } + } catch (IOException e) { + connectionError.set(e); + } + } + }); + timer.stop(); + + if (error.get() != null) { + getLogger().error("Failed to send {} to Solr due to {}; routing to failure", + new Object[]{flowFile, error.get()}); + session.transfer(flowFile, REL_FAILURE); + } else if (connectionError.get() != null) { + getLogger().error("Failed to send {} to Solr due to {}; routing to connection_failure", + new Object[]{flowFile, connectionError.get()}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_CONNECTION_FAILURE); + } else { + StringBuilder transitUri = new StringBuilder("solr://"); + transitUri.append(context.getProperty(SOLR_LOCATION).getValue()); + if (isSolrCloud) { + transitUri.append(":").append(collection); + } + + final long duration = timer.getDuration(TimeUnit.MILLISECONDS); + session.getProvenanceReporter().send(flowFile, transitUri.toString(), duration, true); + getLogger().info("Successfully sent {} to Solr in {} millis", new Object[]{flowFile, duration}); + session.transfer(flowFile, REL_SUCCESS); + } + } + + private boolean causedByIOException(SolrServerException e) { + boolean foundIOException = false; + Throwable cause = e.getCause(); + while (cause != null) { + if (cause instanceof IOException) { + foundIOException = true; + break; + } + cause = cause.getCause(); + } + return foundIOException; + } + + // get all of the dynamic properties and values into a Map for later adding to the Solr request + private Map<String, String[]> getRequestParams(ProcessContext context, FlowFile flowFile) { + final Map<String,String[]> paramsMap = new HashMap<>(); + final SortedMap<String,String> repeatingParams = new TreeMap<>(); + + for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { + final PropertyDescriptor descriptor = entry.getKey(); + if (descriptor.isDynamic()) { + final String paramName = descriptor.getName(); + final String paramValue = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue(); + + if (!paramValue.trim().isEmpty()) { + if (paramName.matches(REPEATING_PARAM_PATTERN)) { + repeatingParams.put(paramName, paramValue); + } else { + MultiMapSolrParams.addParam(paramName, paramValue, paramsMap); + } + } + } + } + + for (final Map.Entry<String,String> entry : repeatingParams.entrySet()) { + final String paramName = entry.getKey(); + final String paramValue = entry.getValue(); + final int idx = paramName.lastIndexOf("."); + MultiMapSolrParams.addParam(paramName.substring(0, idx), paramValue, paramsMap); + } + + return paramsMap; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java new file mode 100644 index 0000000..27f208a --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.solr; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.impl.HttpSolrClient; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * A base class for processors that interact with Apache Solr. + * + */ +public abstract class SolrProcessor extends AbstractProcessor { + + public static final AllowableValue SOLR_TYPE_CLOUD = new AllowableValue( + "Cloud", "Cloud", "A SolrCloud instance."); + + public static final AllowableValue SOLR_TYPE_STANDARD = new AllowableValue( + "Standard", "Standard", "A stand-alone Solr instance."); + + public static final PropertyDescriptor SOLR_TYPE = new PropertyDescriptor + .Builder().name("Solr Type") + .description("The type of Solr instance, Cloud or Standard.") + .required(true) + .allowableValues(SOLR_TYPE_CLOUD, SOLR_TYPE_STANDARD) + .defaultValue(SOLR_TYPE_STANDARD.getValue()) + .build(); + + public static final PropertyDescriptor SOLR_LOCATION = new PropertyDescriptor + .Builder().name("Solr Location") + .description("The Solr url for a Solr Type of Standard (ex: http://localhost:8984/solr/gettingstarted), " + + "or the ZooKeeper hosts for a Solr Type of Cloud (ex: localhost:9983).") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor COLLECTION = new PropertyDescriptor + .Builder().name("Collection") + .description("The Solr collection name, only used with a Solr Type of Cloud") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + private volatile SolrClient solrClient; + + @OnScheduled + public final void onScheduled(final ProcessContext context) throws IOException { + this.solrClient = createSolrClient(context); + } + + /** + * Create a SolrClient based on the type of Solr specified. + * + * @param context + * The context + * @return an HttpSolrClient or CloudSolrClient + */ + protected SolrClient createSolrClient(final ProcessContext context) { + if (SOLR_TYPE_STANDARD.equals(context.getProperty(SOLR_TYPE).getValue())) { + return new HttpSolrClient(context.getProperty(SOLR_LOCATION).getValue()); + } else { + CloudSolrClient cloudSolrClient = new CloudSolrClient( + context.getProperty(SOLR_LOCATION).getValue()); + cloudSolrClient.setDefaultCollection( + context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue()); + return cloudSolrClient; + } + } + + /** + * Returns the {@link org.apache.solr.client.solrj.SolrClient} that was created by the + * {@link #createSolrClient(org.apache.nifi.processor.ProcessContext)} method + * + * @return an HttpSolrClient or CloudSolrClient + */ + protected final SolrClient getSolrClient() { + return solrClient; + } + + @Override + protected final Collection<ValidationResult> customValidate(ValidationContext context) { + final List<ValidationResult> problems = new ArrayList<>(); + + if (SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) { + final String collection = context.getProperty(COLLECTION).getValue(); + if (collection == null || collection.trim().isEmpty()) { + problems.add(new ValidationResult.Builder() + .subject(COLLECTION.getName()) + .input(collection).valid(false) + .explanation("A collection must specified for Solr Type of Cloud") + .build()); + } + } + + Collection<ValidationResult> otherProblems = this.additionalCustomValidation(context); + if (otherProblems != null) { + problems.addAll(otherProblems); + } + + return problems; + } + + /** + * Allows additional custom validation to be done. This will be called from + * the parent's customValidation method. + * + * @param context + * The context + * @return Validation results indicating problems + */ + protected Collection<ValidationResult> additionalCustomValidation(ValidationContext context) { + return new ArrayList<>(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..657d0e8 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.solr.PutSolrContentStream +org.apache.nifi.processors.solr.GetSolr http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.PutSolrContentStream/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.PutSolrContentStream/additionalDetails.html b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.PutSolrContentStream/additionalDetails.html new file mode 100644 index 0000000..054cdb6 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.PutSolrContentStream/additionalDetails.html @@ -0,0 +1,48 @@ +<!DOCTYPE html> +<html lang="en"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<head> + <meta charset="utf-8" /> + <title>PutSolrContentStream</title> + <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> +</head> + +<body> +<h2>Usage Example</h2> +<p> + This processor streams the contents of a FlowFile to an Apache Solr + update handler. Any properties added to this processor by the user are + passed to Solr on the update request. If a parameter must be sent multiple + times with different values, properties can follow a naming convention: + name.number, where name is the parameter name and number is a unique number. + Repeating parameters will be sorted by their property name. +</p> +<p> + Example: To specify multiple 'f' parameters for indexing custom json, the following + properties can be defined: +</p> +<ul> + <li><strong>split</strong>: /exams</li> + <li><strong>f.1</strong>: first:/first</li> + <li><strong>f.2</strong>: last:/last</li> + <li><strong>f.3</strong>: grade:/grade</li> +</ul> +<p> + This will result in sending the following url to Solr: </br> + split=/exams&f=first:/first&f=last:/last&f=grade:/grade +</p> +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/EmbeddedSolrServerFactory.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/EmbeddedSolrServerFactory.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/EmbeddedSolrServerFactory.java new file mode 100644 index 0000000..5e21657 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/EmbeddedSolrServerFactory.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.solr; + +import org.apache.commons.io.FileUtils; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.CoreDescriptor; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; + +/** + * Helper to create EmbeddedSolrServer instances for testing. + */ +public class EmbeddedSolrServerFactory { + + public static final String DEFAULT_SOLR_HOME = "src/test/resources/solr"; + public static final String DEFAULT_CORE_HOME = "src/test/resources/"; + public static final String DEFAULT_DATA_DIR = "target"; + + /** + * Use the defaults to create the core. + * + * @param coreName the name of the core + * @return an EmbeddedSolrServer for the given core + */ + public static SolrClient create(String coreName) throws IOException { + return create(DEFAULT_SOLR_HOME, DEFAULT_CORE_HOME, + coreName, DEFAULT_DATA_DIR); + } + + /** + * + * @param solrHome + * path to directory where solr.xml lives + * @param coreName + * the name of the core to load + * @param dataDir + * the data dir for the core + * + * @return an EmbeddedSolrServer for the given core + */ + public static SolrClient create(String solrHome, String coreHome, String coreName, String dataDir) + throws IOException { + + Properties props = new Properties(); + if (dataDir != null) { + File coreDataDir = new File(dataDir + "/" + coreName); + if (coreDataDir.exists()) { + FileUtils.deleteDirectory(coreDataDir); + } + props.setProperty("dataDir", dataDir + "/" + coreName); + } + + CoreContainer coreContainer = new CoreContainer(solrHome); + coreContainer.load(); + + CoreDescriptor descriptor = new CoreDescriptor(coreContainer, coreName, + new File(coreHome, coreName).getAbsolutePath(), props); + + coreContainer.create(descriptor); + return new EmbeddedSolrServer(coreContainer, coreName); + } +} + + http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java new file mode 100644 index 0000000..b0f5e68 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.solr; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.common.SolrInputDocument; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.Locale; +import java.util.Properties; +import java.util.TimeZone; + +import static org.junit.Assert.assertTrue; + +public class TestGetSolr { + + static final String DEFAULT_SOLR_CORE = "testCollection"; + + private SolrClient solrClient; + + @Before + public void setup() { + // create the conf dir if it doesn't exist + File confDir = new File("conf"); + if (!confDir.exists()) { + confDir.mkdir(); + } + + try { + // create an EmbeddedSolrServer for the processor to use + String relPath = getClass().getProtectionDomain().getCodeSource() + .getLocation().getFile() + "../../target"; + + solrClient = EmbeddedSolrServerFactory.create(EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME, + EmbeddedSolrServerFactory.DEFAULT_CORE_HOME, DEFAULT_SOLR_CORE, relPath); + + // create some test documents + SolrInputDocument doc1 = new SolrInputDocument(); + doc1.addField("first", "bob"); + doc1.addField("last", "smith"); + doc1.addField("created", new Date()); + + SolrInputDocument doc2 = new SolrInputDocument(); + doc2.addField("first", "alice"); + doc2.addField("last", "smith"); + doc2.addField("created", new Date()); + + SolrInputDocument doc3 = new SolrInputDocument(); + doc3.addField("first", "mike"); + doc3.addField("last", "smith"); + doc3.addField("created", new Date()); + + SolrInputDocument doc4 = new SolrInputDocument(); + doc4.addField("first", "john"); + doc4.addField("last", "smith"); + doc4.addField("created", new Date()); + + SolrInputDocument doc5 = new SolrInputDocument(); + doc5.addField("first", "joan"); + doc5.addField("last", "smith"); + doc5.addField("created", new Date()); + + // add the test data to the index + solrClient.add(Arrays.asList(doc1, doc2, doc3, doc4, doc5)); + solrClient.commit(); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + @After + public void teardown() { + File confDir = new File("conf"); + assertTrue(confDir.exists()); + File[] files = confDir.listFiles(); + assertTrue(files.length > 0); + for (File file : files) { + assertTrue("Failed to delete " + file.getName(), file.delete()); + } + assertTrue(confDir.delete()); + + try { + solrClient.shutdown(); + } catch (Exception e) { + } + } + + @Test + public void testMoreThanBatchSizeShouldProduceMultipleFlowFiles() throws IOException, SolrServerException { + final TestableProcessor proc = new TestableProcessor(solrClient); + final TestRunner runner = TestRunners.newTestRunner(proc); + + // setup a lastEndDate file to simulate picking up from a previous end date + SimpleDateFormat sdf = new SimpleDateFormat(GetSolr.LAST_END_DATE_PATTERN, Locale.US); + sdf.setTimeZone(TimeZone.getTimeZone("GMT")); + + Calendar cal = new GregorianCalendar(); + cal.add(Calendar.MINUTE, -30); + final String lastEndDate = sdf.format(cal.getTime()); + + File lastEndDateCache = new File(GetSolr.FILE_PREFIX + proc.getIdentifier()); + try (FileOutputStream fos = new FileOutputStream(lastEndDateCache)) { + Properties props = new Properties(); + props.setProperty(GetSolr.LAST_END_DATE, lastEndDate); + props.store(fos, "GetSolr LastEndDate value"); + } catch (IOException e) { + Assert.fail("Failed to setup last end date value: " + e.getMessage()); + } + + runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); + runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); + runner.setProperty(GetSolr.SOLR_QUERY, "last:smith"); + runner.setProperty(GetSolr.RETURN_FIELDS, "first, last, created"); + runner.setProperty(GetSolr.SORT_CLAUSE, "created desc, first asc"); + runner.setProperty(GetSolr.DATE_FIELD, "created"); + runner.setProperty(GetSolr.BATCH_SIZE, "2"); + + runner.run(); + runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 3); + } + + @Test + public void testLessThanBatchSizeShouldProduceOneFlowFile() throws IOException, SolrServerException { + final TestableProcessor proc = new TestableProcessor(solrClient); + + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); + runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); + runner.setProperty(GetSolr.SOLR_QUERY, "last:smith"); + runner.setProperty(GetSolr.RETURN_FIELDS, "created"); + runner.setProperty(GetSolr.SORT_CLAUSE, "created desc"); + runner.setProperty(GetSolr.DATE_FIELD, "created"); + runner.setProperty(GetSolr.BATCH_SIZE, "10"); + + runner.run(); + runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 1); + } + + @Test + public void testNoResultsShouldProduceNoOutput() throws IOException, SolrServerException { + final TestableProcessor proc = new TestableProcessor(solrClient); + + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); + runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); + runner.setProperty(GetSolr.SOLR_QUERY, "last:xyz"); + runner.setProperty(GetSolr.RETURN_FIELDS, "created"); + runner.setProperty(GetSolr.SORT_CLAUSE, "created desc"); + runner.setProperty(GetSolr.DATE_FIELD, "created"); + runner.setProperty(GetSolr.BATCH_SIZE, "10"); + + runner.run(); + runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 0); + } + + + // Override createSolrClient and return the passed in SolrClient + private class TestableProcessor extends GetSolr { + private SolrClient solrClient; + + public TestableProcessor(SolrClient solrClient) { + this.solrClient = solrClient; + } + @Override + protected SolrClient createSolrClient(ProcessContext context) { + return solrClient; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java new file mode 100644 index 0000000..eaa009c --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.solr; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Test for PutSolr processor. + */ +public class TestPutSolrContentStream { + + static final String DEFAULT_SOLR_CORE = "testCollection"; + + static final String CUSTOM_JSON_SINGLE_DOC_FILE = "src/test/resources/testdata/test-custom-json-single-doc.json"; + static final String SOLR_JSON_MULTIPLE_DOCS_FILE = "src/test/resources/testdata/test-solr-json-multiple-docs.json"; + static final String CSV_MULTIPLE_DOCS_FILE = "src/test/resources/testdata/test-csv-multiple-docs.csv"; + static final String XML_MULTIPLE_DOCS_FILE = "src/test/resources/testdata/test-xml-multiple-docs.xml"; + + static final SolrDocument expectedDoc1 = new SolrDocument(); + static { + expectedDoc1.addField("first", "John"); + expectedDoc1.addField("last", "Doe"); + expectedDoc1.addField("grade", 8); + expectedDoc1.addField("subject", "Math"); + expectedDoc1.addField("test", "term1"); + expectedDoc1.addField("marks", 90); + } + + static final SolrDocument expectedDoc2 = new SolrDocument(); + static { + expectedDoc2.addField("first", "John"); + expectedDoc2.addField("last", "Doe"); + expectedDoc2.addField("grade", 8); + expectedDoc2.addField("subject", "Biology"); + expectedDoc2.addField("test", "term1"); + expectedDoc2.addField("marks", 86); + } + + /** + * Creates a base TestRunner with Solr Type of standard. + */ + private static TestRunner createDefaultTestRunner(PutSolrContentStream processor) { + TestRunner runner = TestRunners.newTestRunner(processor); + runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); + runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr"); + return runner; + } + + @Test + public void testUpdateWithSolrJson() throws IOException, SolrServerException { + final SolrClient solrClient = createEmbeddedSolrClient(DEFAULT_SOLR_CORE); + final TestableProcessor proc = new TestableProcessor(solrClient); + + final TestRunner runner = createDefaultTestRunner(proc); + runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update/json/docs"); + runner.setProperty("json.command", "false"); + + try (FileInputStream fileIn = new FileInputStream(SOLR_JSON_MULTIPLE_DOCS_FILE)) { + runner.enqueue(fileIn); + + runner.run(); + runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0); + runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0); + runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1); + + verifySolrDocuments(proc.getSolrClient(), Arrays.asList(expectedDoc1, expectedDoc2)); + } finally { + try { + proc.getSolrClient().close(); + } catch (Exception e) { + } + } + } + + @Test + public void testUpdateWithCustomJson() throws IOException, SolrServerException { + final SolrClient solrClient = createEmbeddedSolrClient(DEFAULT_SOLR_CORE); + final TestableProcessor proc = new TestableProcessor(solrClient); + + final TestRunner runner = createDefaultTestRunner(proc); + runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update/json/docs"); + runner.setProperty("split", "/exams"); + runner.setProperty("f.1", "first:/first"); + runner.setProperty("f.2", "last:/last"); + runner.setProperty("f.3", "grade:/grade"); + runner.setProperty("f.4", "subject:/exams/subject"); + runner.setProperty("f.5", "test:/exams/test"); + runner.setProperty("f.6", "marks:/exams/marks"); + + try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) { + runner.enqueue(fileIn); + + runner.run(); + runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0); + runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0); + runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1); + + verifySolrDocuments(proc.getSolrClient(), Arrays.asList(expectedDoc1, expectedDoc2)); + } finally { + try { + proc.getSolrClient().close(); + } catch (Exception e) { + } + } + } + + @Test + public void testUpdateWithCsv() throws IOException, SolrServerException { + final SolrClient solrClient = createEmbeddedSolrClient(DEFAULT_SOLR_CORE); + final TestableProcessor proc = new TestableProcessor(solrClient); + + final TestRunner runner = createDefaultTestRunner(proc); + runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update/csv"); + runner.setProperty("fieldnames", "first,last,grade,subject,test,marks"); + + try (FileInputStream fileIn = new FileInputStream(CSV_MULTIPLE_DOCS_FILE)) { + runner.enqueue(fileIn); + + runner.run(); + runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0); + runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0); + runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1); + + verifySolrDocuments(proc.getSolrClient(), Arrays.asList(expectedDoc1, expectedDoc2)); + } finally { + try { + proc.getSolrClient().close(); + } catch (Exception e) { + } + } + } + + @Test + public void testUpdateWithXml() throws IOException, SolrServerException { + final SolrClient solrClient = createEmbeddedSolrClient(DEFAULT_SOLR_CORE); + final TestableProcessor proc = new TestableProcessor(solrClient); + + final TestRunner runner = createDefaultTestRunner(proc); + runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update"); + runner.setProperty(PutSolrContentStream.CONTENT_TYPE, "application/xml"); + + try (FileInputStream fileIn = new FileInputStream(XML_MULTIPLE_DOCS_FILE)) { + runner.enqueue(fileIn); + + runner.run(); + runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0); + runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0); + runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1); + + verifySolrDocuments(proc.getSolrClient(), Arrays.asList(expectedDoc1, expectedDoc2)); + } finally { + try { + proc.getSolrClient().close(); + } catch (Exception e) { + } + } + } + + @Test + public void testDeleteWithXml() throws IOException, SolrServerException { + final SolrClient solrClient = createEmbeddedSolrClient(DEFAULT_SOLR_CORE); + final TestableProcessor proc = new TestableProcessor(solrClient); + + final TestRunner runner = createDefaultTestRunner(proc); + runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update"); + runner.setProperty(PutSolrContentStream.CONTENT_TYPE, "application/xml"); + runner.setProperty("commit", "true"); + + // add a document so there is something to delete + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("first", "bob"); + doc.addField("last", "smith"); + doc.addField("created", new Date()); + + solrClient.add(doc); + solrClient.commit(); + + // prove the document got added + SolrQuery query = new SolrQuery("*:*"); + QueryResponse qResponse = solrClient.query(query); + Assert.assertEquals(1, qResponse.getResults().getNumFound()); + + // run the processor with a delete-by-query command + runner.enqueue("<delete><query>first:bob</query></delete>".getBytes("UTF-8")); + runner.run(); + + // prove the document got deleted + qResponse = solrClient.query(query); + Assert.assertEquals(0, qResponse.getResults().getNumFound()); + } + + @Test + public void testSolrServerExceptionShouldRouteToFailure() throws IOException, SolrServerException { + final Throwable throwable = new SolrServerException("Invalid Document"); + final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable); + + final TestRunner runner = createDefaultTestRunner(proc); + + try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) { + runner.enqueue(fileIn); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_FAILURE, 1); + verify(proc.getSolrClient(), times(1)).request(any(SolrRequest.class), eq((String)null)); + } + } + + @Test + public void testSolrServerExceptionCausedByIOExceptionShouldRouteToConnectionFailure() throws IOException, SolrServerException { + final Throwable throwable = new SolrServerException(new IOException("Error communicating with Solr")); + final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable); + + final TestRunner runner = createDefaultTestRunner(proc); + + try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) { + runner.enqueue(fileIn); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_CONNECTION_FAILURE, 1); + verify(proc.getSolrClient(), times(1)).request(any(SolrRequest.class), eq((String)null)); + } + } + + @Test + public void testSolrExceptionShouldRouteToFailure() throws IOException, SolrServerException { + final Throwable throwable = new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Error"); + final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable); + + final TestRunner runner = createDefaultTestRunner(proc); + + try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) { + runner.enqueue(fileIn); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_FAILURE, 1); + verify(proc.getSolrClient(), times(1)).request(any(SolrRequest.class), eq((String)null)); + } + } + + @Test + public void testRemoteSolrExceptionShouldRouteToFailure() throws IOException, SolrServerException { + final Throwable throwable = new HttpSolrClient.RemoteSolrException( + "host", 401, "error", new NumberFormatException()); + final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable); + + final TestRunner runner = createDefaultTestRunner(proc); + + try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) { + runner.enqueue(fileIn); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_FAILURE, 1); + verify(proc.getSolrClient(), times(1)).request(any(SolrRequest.class), eq((String)null)); + } + } + + @Test + public void testIOExceptionShouldRouteToConnectionFailure() throws IOException, SolrServerException { + final Throwable throwable = new IOException("Error communicating with Solr"); + final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable); + + final TestRunner runner = createDefaultTestRunner(proc); + + try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) { + runner.enqueue(fileIn); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_CONNECTION_FAILURE, 1); + verify(proc.getSolrClient(), times(1)).request(any(SolrRequest.class), eq((String)null)); + } + } + + @Test + public void testSolrTypeCloudShouldRequireCollection() { + final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class); + runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); + runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr"); + runner.assertNotValid(); + + runner.setProperty(PutSolrContentStream.COLLECTION, "someCollection1"); + runner.assertValid(); + } + + @Test + public void testSolrTypeStandardShouldNotRequireCollection() { + final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class); + runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); + runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr"); + runner.assertValid(); + } + + + // Override the createSolrClient method to inject a Mock. + private class ExceptionThrowingProcessor extends PutSolrContentStream { + + private SolrClient mockSolrClient; + private Throwable throwable; + + public ExceptionThrowingProcessor(Throwable throwable) { + this.throwable = throwable; + } + + @Override + protected SolrClient createSolrClient(ProcessContext context) { + mockSolrClient = Mockito.mock(SolrClient.class); + try { + when(mockSolrClient.request(any(SolrRequest.class), + eq((String)null))).thenThrow(throwable); + } catch (SolrServerException e) { + Assert.fail(e.getMessage()); + } catch (IOException e) { + Assert.fail(e.getMessage()); + } + return mockSolrClient; + } + + } + + // Override createSolrClient and return the passed in SolrClient + private class TestableProcessor extends PutSolrContentStream { + private SolrClient solrClient; + + public TestableProcessor(SolrClient solrClient) { + this.solrClient = solrClient; + } + @Override + protected SolrClient createSolrClient(ProcessContext context) { + return solrClient; + } + } + + // Create an EmbeddedSolrClient with the given core name. + private static SolrClient createEmbeddedSolrClient(String coreName) throws IOException { + String relPath = TestPutSolrContentStream.class.getProtectionDomain() + .getCodeSource().getLocation().getFile() + + "../../target"; + + return EmbeddedSolrServerFactory.create( + EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME, + EmbeddedSolrServerFactory.DEFAULT_CORE_HOME, + coreName, relPath); + } + + /** + * Verify that given SolrServer contains the expected SolrDocuments. + */ + private static void verifySolrDocuments(SolrClient solrServer, Collection<SolrDocument> expectedDocuments) + throws IOException, SolrServerException { + + solrServer.commit(); + + SolrQuery query = new SolrQuery("*:*"); + QueryResponse qResponse = solrServer.query(query); + Assert.assertEquals(expectedDocuments.size(), qResponse.getResults().getNumFound()); + + // verify documents have expected fields and values + for (SolrDocument expectedDoc : expectedDocuments) { + boolean found = false; + for (SolrDocument solrDocument : qResponse.getResults()) { + boolean foundAllFields = true; + for (String expectedField : expectedDoc.getFieldNames()) { + Object expectedVal = expectedDoc.getFirstValue(expectedField); + Object actualVal = solrDocument.getFirstValue(expectedField); + foundAllFields = expectedVal.equals(actualVal); + } + + if (foundAllFields) { + found = true; + break; + } + } + Assert.assertTrue("Could not find " + expectedDoc, found); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/log4j.properties b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/log4j.properties new file mode 100644 index 0000000..4a3bdd3 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/log4j.properties @@ -0,0 +1,14 @@ +# Logging level +solr.log=logs/ +log4j.rootLogger=INFO, CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender + +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x \u2013 %m%n + +log4j.logger.org.apache.zookeeper=WARN +log4j.logger.org.apache.hadoop=WARN + +# set to INFO to enable infostream log messages +log4j.logger.org.apache.solr.update.LoggingInfoStream=OFF \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/solr.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/solr.xml b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/solr.xml new file mode 100644 index 0000000..86fb3db --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/solr.xml @@ -0,0 +1,18 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<solr> + + <solrcloud> + <str name="host">${host:}</str> + <int name="hostPort">${jetty.port:8983}</int> + <str name="hostContext">${hostContext:solr}</str> + <int name="zkClientTimeout">${zkClientTimeout:30000}</int> + <bool name="genericCoreNodeNames">${genericCoreNodeNames:true}</bool> + </solrcloud> + + <shardHandlerFactory name="shardHandlerFactory" + class="HttpShardHandlerFactory"> + <int name="socketTimeout">${socketTimeout:0}</int> + <int name="connTimeout">${connTimeout:0}</int> + </shardHandlerFactory> + +</solr> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/_rest_managed.json ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/_rest_managed.json b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/_rest_managed.json new file mode 100644 index 0000000..e7ada3f --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/_rest_managed.json @@ -0,0 +1,3 @@ +{ + "initArgs":{}, + "managedList":[]} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/lang/stopwords_en.txt ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/lang/stopwords_en.txt b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/lang/stopwords_en.txt new file mode 100644 index 0000000..2c164c0 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/lang/stopwords_en.txt @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# a couple of test stopwords to test that the words are really being +# configured from this file: +stopworda +stopwordb + +# Standard english stop words taken from Lucene's StopAnalyzer +a +an +and +are +as +at +be +but +by +for +if +in +into +is +it +no +not +of +on +or +such +that +the +their +then +there +these +they +this +to +was +will +with http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/protwords.txt ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/protwords.txt b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/protwords.txt new file mode 100644 index 0000000..1dfc0ab --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/protwords.txt @@ -0,0 +1,21 @@ +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#----------------------------------------------------------------------- +# Use a protected word file to protect against the stemmer reducing two +# unrelated words to the same base word. + +# Some non-words that normally won't be encountered, +# just to test that they won't be stemmed. +dontstems +zwhacky + http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/schema.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/schema.xml b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/schema.xml new file mode 100644 index 0000000..d2f7e8f --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/schema.xml @@ -0,0 +1,21 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<schema version="1.5" name="testCollection"> + + <fieldType name="string" class="solr.StrField"/> + <fieldType name="date" class="solr.TrieDateField" precisionStep="0" positionIncrementGap="0"/> + <fieldType name="int" class="solr.TrieIntField" precisionStep="0" positionIncrementGap="0"/> + <fieldType name="float" class="solr.TrieFloatField" precisionStep="0" positionIncrementGap="0"/> + <fieldType name="long" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0"/> + <fieldType name="double" class="solr.TrieDoubleField" precisionStep="0" positionIncrementGap="0"/> + + <field name="_version_" type="long" indexed="true" stored="true"/> + + <field name="first" type="string" indexed="true" stored="true" /> + <field name="last" type="string" indexed="true" stored="true" /> + <field name="grade" type="int" indexed="true" stored="true" /> + <field name="marks" type="int" indexed="true" stored="true" /> + <field name="test" type="string" indexed="true" stored="true" /> + <field name="subject" type="string" indexed="true" stored="true" /> + <field name="created" type="date" indexed="true" stored="true" /> + +</schema> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/solrconfig.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/solrconfig.xml b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/solrconfig.xml new file mode 100644 index 0000000..148a2db --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/solrconfig.xml @@ -0,0 +1,20 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<config> + <luceneMatchVersion>5.0.0</luceneMatchVersion> + + <dataDir>${solr.data.dir:}</dataDir> + + <directoryFactory name="DirectoryFactory" + class="${solr.directoryFactory:solr.RAMDirectoryFactory}"/> + <indexConfig> + <lockType>single</lockType> + </indexConfig> + + <requestDispatcher handleSelect="false"> + <httpCaching never304="true" /> + </requestDispatcher> + + <requestHandler name="/select" class="solr.SearchHandler" /> + <requestHandler name="/update" class="solr.UpdateRequestHandler" /> + +</config> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/synonyms.txt ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/synonyms.txt b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/synonyms.txt new file mode 100644 index 0000000..7f72128 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/synonyms.txt @@ -0,0 +1,29 @@ +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#----------------------------------------------------------------------- +#some test synonym mappings unlikely to appear in real input text +aaafoo => aaabar +bbbfoo => bbbfoo bbbbar +cccfoo => cccbar cccbaz +fooaaa,baraaa,bazaaa + +# Some synonym groups specific to this example +GB,gib,gigabyte,gigabytes +MB,mib,megabyte,megabytes +Television, Televisions, TV, TVs +#notice we use "gib" instead of "GiB" so any WordDelimiterFilter coming +#after us won't split it into two words. + +# Synonym mappings can be used for spelling correction too +pixima => pixma + http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/core.properties ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/core.properties b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/core.properties new file mode 100644 index 0000000..4e16ece --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/core.properties @@ -0,0 +1 @@ +name=jsonCollection \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-csv-multiple-docs.csv ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-csv-multiple-docs.csv b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-csv-multiple-docs.csv new file mode 100644 index 0000000..5657a89 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-csv-multiple-docs.csv @@ -0,0 +1,2 @@ +John,Doe,8,Math,term1,90 +John,Doe,8,Biology,term1,86 http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-custom-json-single-doc.json ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-custom-json-single-doc.json b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-custom-json-single-doc.json new file mode 100644 index 0000000..5cca807 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-custom-json-single-doc.json @@ -0,0 +1,15 @@ +{ + "first": "John", + "last": "Doe", + "grade": 8, + "exams": [ + { + "subject": "Math", + "test" : "term1", + "marks":90}, + { + "subject": "Biology", + "test" : "term1", + "marks":86} + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-solr-json-multiple-docs.json ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-solr-json-multiple-docs.json b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-solr-json-multiple-docs.json new file mode 100644 index 0000000..cea939b --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-solr-json-multiple-docs.json @@ -0,0 +1,18 @@ +[ +{ + "first": "John", + "last": "Doe", + "grade": 8, + "subject": "Math", + "test" : "term1", + "marks": 90 +}, +{ + "first": "John", + "last": "Doe", + "grade": 8, + "subject": "Biology", + "test" : "term1", + "marks": 86 +} +] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-xml-multiple-docs.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-xml-multiple-docs.xml b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-xml-multiple-docs.xml new file mode 100644 index 0000000..4622e0d --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-xml-multiple-docs.xml @@ -0,0 +1,18 @@ +<add> + <doc> + <field name="first">John</field> + <field name="last">Doe</field> + <field name="grade">8</field> + <field name="subject">Math</field> + <field name="test">term1</field> + <field name="marks">90</field> + </doc> + <doc> + <field name="first">John</field> + <field name="last">Doe</field> + <field name="grade">8</field> + <field name="subject">Biology</field> + <field name="test">term1</field> + <field name="marks">86</field> + </doc> +</add> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-solr-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/pom.xml b/nifi/nifi-nar-bundles/nifi-solr-bundle/pom.xml new file mode 100644 index 0000000..e027a63 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/pom.xml @@ -0,0 +1,39 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-bundles</artifactId> + <version>0.1.0-incubating-SNAPSHOT</version> + </parent> + + <artifactId>nifi-solr-bundle</artifactId> + <packaging>pom</packaging> + + <description>A bundle of processors that can store and retrieve data from Apache Solr</description> + + <properties> + <solr.version>5.1.0</solr.version> + </properties> + + <modules> + <module>nifi-solr-processors</module> + <module>nifi-solr-nar</module> + </modules> + +</project>
