Repository: nifi
Updated Branches:
  refs/heads/master 0390c0f19 -> e3f472079


NIFI-4035 - Adding PutSolrRecord Processor that reads NiFi records and indexes 
them into Solr as SolrDocuments

Adding Test Cases for PutSolrRecord Processor

Adding PutSolrRecord Processor in the list of Processors

Resolving checkstyle errors

Resolving checkstyle errors in test classes

Adding License information and additional information about the processor

1. Implementing Batch Indexing 2. Changes for nested records 3. Removing 
MockRecordParser

Fixing bugs with nested records

Updating version of dependencies

Setting Expression Language Scope

This closes #2561.

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e3f47207
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e3f47207
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e3f47207

Branch: refs/heads/master
Commit: e3f4720797a9777264d1732b2e1475b8f344a8f0
Parents: 0390c0f
Author: abhinavrohatgi30 <[email protected]>
Authored: Sat Mar 17 16:35:06 2018 +0100
Committer: Bryan Bende <[email protected]>
Committed: Tue Apr 24 10:50:40 2018 -0400

----------------------------------------------------------------------
 .../nifi-solr-processors/pom.xml                |   6 +
 .../nifi/processors/solr/PutSolrRecord.java     | 337 ++++++++
 .../apache/nifi/processors/solr/SolrUtils.java  | 140 +++-
 .../org.apache.nifi.processor.Processor         |   1 +
 .../additionalDetails.html                      | 114 +++
 .../nifi/processors/solr/TestPutSolrRecord.java | 832 +++++++++++++++++++
 .../solr/testCollection/conf/schema.xml         |  66 +-
 7 files changed, 1465 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e3f47207/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml
index 5684f37..ea1499a 100755
--- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml
@@ -94,6 +94,12 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <version>1.7.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-simple</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3f47207/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java
 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java
new file mode 100644
index 0000000..0b3e107
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.solr;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.MultiMapSolrParams;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
+import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
+import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
+import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
+import static 
org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
+import static 
org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
+import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
+import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
+import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
+import static org.apache.nifi.processors.solr.SolrUtils.writeRecord;
+
+
+@Tags({"Apache", "Solr", "Put", "Send","Record"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Indexes the Records from a FlowFile into Solr")
+@DynamicProperty(name="A Solr request parameter name", value="A Solr request 
parameter value",
+        description="These parameters will be passed to Solr on the request")
+public class PutSolrRecord extends SolrProcessor {
+
+    public static final PropertyDescriptor UPDATE_PATH = new PropertyDescriptor
+            .Builder().name("Solr Update Path").displayName("Solr Update Path")
+            .description("The path in Solr to post the Flowfile Records")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("/update")
+            .build();
+
+    public static final PropertyDescriptor FIELDS_TO_INDEX  = new 
PropertyDescriptor
+            .Builder().name("Fields To Index").displayName("Fields To Index")
+            .displayName("Fields To Index")
+            .description("Comma-separated list of field names to write")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor COMMIT_WITHIN = new 
PropertyDescriptor
+            .Builder().name("Commit Within").displayName("Commit Within")
+            .description("The number of milliseconds before the given update 
is committed")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("5000")
+            .build();
+
+    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor
+            .Builder().name("Batch Size").displayName("Batch Size")
+            .description("The number of solr documents to index per batch")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("500")
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("The original FlowFile")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that failed for any reason other than Solr 
being unreachable")
+            .build();
+
+    public static final Relationship REL_CONNECTION_FAILURE = new 
Relationship.Builder()
+            .name("connection_failure")
+            .description("FlowFiles that failed because Solr is unreachable")
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            
.name("put-solr-record-record-reader").displayName("put-solr-record-record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    public static final String COLLECTION_PARAM_NAME = "collection";
+    public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
+    public static final String REPEATING_PARAM_PATTERN = "\\w+\\.\\d+";
+
+    private Set<Relationship> relationships;
+    private List<PropertyDescriptor> descriptors;
+    private static final String EMPTY_STRING = "";
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        super.init(context);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(SOLR_TYPE);
+        descriptors.add(SOLR_LOCATION);
+        descriptors.add(COLLECTION);
+        descriptors.add(UPDATE_PATH);
+        descriptors.add(RECORD_READER);
+        descriptors.add(FIELDS_TO_INDEX);
+        descriptors.add(COMMIT_WITHIN);
+        descriptors.add(JAAS_CLIENT_APP_NAME);
+        descriptors.add(BASIC_USERNAME);
+        descriptors.add(BASIC_PASSWORD);
+        descriptors.add(SSL_CONTEXT_SERVICE);
+        descriptors.add(SOLR_SOCKET_TIMEOUT);
+        descriptors.add(SOLR_CONNECTION_TIMEOUT);
+        descriptors.add(SOLR_MAX_CONNECTIONS);
+        descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST);
+        descriptors.add(ZK_CLIENT_TIMEOUT);
+        descriptors.add(ZK_CONNECTION_TIMEOUT);
+        descriptors.add(BATCH_SIZE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_CONNECTION_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return this.descriptors;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .description("Specifies the value to send for the '" + 
propertyDescriptorName + "' request parameter")
+                .name(propertyDescriptorName)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                .dynamic(true)
+                
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                .build();
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if ( flowFile == null ) {
+            return;
+        }
+
+        final AtomicReference<Exception> error = new AtomicReference<>(null);
+        final AtomicReference<Exception> connectionError = new 
AtomicReference<>(null);
+
+        final boolean isSolrCloud = 
SOLR_TYPE_CLOUD.getValue().equals(context.getProperty(SOLR_TYPE).getValue());
+        final String collection = 
context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFile).getValue();
+        final Long commitWithin = 
context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
+        final String contentStreamPath = 
context.getProperty(UPDATE_PATH).evaluateAttributeExpressions(flowFile).getValue();
+        final MultiMapSolrParams requestParams = new 
MultiMapSolrParams(SolrUtils.getRequestParams(context, flowFile));
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final String fieldsToIndex = 
context.getProperty(FIELDS_TO_INDEX).evaluateAttributeExpressions(flowFile).getValue();
+        final Long batchSize = 
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(flowFile).asLong();
+
+        final List<String> fieldList = new ArrayList<>();
+        if (!StringUtils.isBlank(fieldsToIndex)) {
+            Arrays.stream(fieldsToIndex.split(",")).forEach( f -> 
fieldList.add(f.trim()));
+        }
+        StopWatch timer = new StopWatch(true);
+        try (final InputStream in = session.read(flowFile);
+             final RecordReader reader = 
readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+            Record record;
+            List<SolrInputDocument> inputDocumentList = new LinkedList<>();
+            try {
+                while ((record = reader.nextRecord()) != null) {
+                SolrInputDocument inputDoc = new SolrInputDocument();
+                writeRecord(record, inputDoc,fieldList,EMPTY_STRING);
+                inputDocumentList.add(inputDoc);
+                if(inputDocumentList.size()==batchSize) {
+                    index(isSolrCloud, collection, commitWithin, 
contentStreamPath, requestParams, inputDocumentList);
+                    inputDocumentList = new ArrayList<>();
+                }
+               index(isSolrCloud, collection, commitWithin, contentStreamPath, 
requestParams, inputDocumentList);
+            }
+            } catch (SolrException e) {
+                error.set(e);
+            } catch (SolrServerException e) {
+                if (causedByIOException(e)) {
+                    //Exit in case of a solr connection error
+                    connectionError.set(e);
+                } else {
+                    error.set(e);
+                }
+            } catch (IOException e) {
+                //Exit in case of a solr connection error
+                connectionError.set(e);
+            }
+        } catch (final IOException | SchemaNotFoundException | 
MalformedRecordException e) {
+            getLogger().error("Could not parse incoming data", e);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+        timer.stop();
+
+        if (error.get() != null) {
+            getLogger().error("Failed to send all the records of the {} to 
Solr due to {}; routing to failure",
+                    new Object[]{flowFile, error.get()});
+            session.transfer(flowFile, REL_FAILURE);
+        } else if (connectionError.get() != null) {
+            getLogger().error("Failed to send {} to Solr due to {}; routing to 
connection_failure",
+                    new Object[]{flowFile, connectionError.get()});
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_CONNECTION_FAILURE);
+        } else {
+            StringBuilder transitUri = new StringBuilder("solr://");
+            transitUri.append(getSolrLocation());
+            if (isSolrCloud) {
+                transitUri.append(":").append(collection);
+            }
+
+            final long duration = timer.getDuration(TimeUnit.MILLISECONDS);
+            session.getProvenanceReporter().send(flowFile, 
transitUri.toString(), duration, true);
+            getLogger().info("Successfully sent {} to Solr in {} millis", new 
Object[]{flowFile, duration});
+            session.transfer(flowFile, REL_SUCCESS);
+        }
+    }
+
+    private void index(boolean isSolrCloud, String collection, Long 
commitWithin, String contentStreamPath, MultiMapSolrParams requestParams, 
List<SolrInputDocument> inputDocumentList)
+            throws IOException, SolrServerException,SolrException {
+        UpdateRequest request = new UpdateRequest(contentStreamPath);
+        request.setParams(new ModifiableSolrParams());
+
+        // add the extra params, don't use 'set' in case of repeating params
+        Iterator<String> paramNames = 
requestParams.getParameterNamesIterator();
+        while (paramNames.hasNext()) {
+            String paramName = paramNames.next();
+            for (String paramValue : requestParams.getParams(paramName)) {
+                request.getParams().add(paramName, paramValue);
+            }
+        }
+
+        // specify the collection for SolrCloud
+        if (isSolrCloud) {
+            request.setParam(COLLECTION_PARAM_NAME, collection);
+        }
+
+        if (commitWithin != null && commitWithin > 0) {
+            request.setParam(COMMIT_WITHIN_PARAM_NAME, 
commitWithin.toString());
+        }
+
+        // if a username and password were provided then pass them for basic 
auth
+        if (isBasicAuthEnabled()) {
+            request.setBasicAuthCredentials(getUsername(), getPassword());
+        }
+        request.add(inputDocumentList);
+        UpdateResponse response = request.process(getSolrClient());
+        getLogger().debug("Got {} response from Solr", new 
Object[]{response.getStatus()});
+        inputDocumentList.clear();
+    }
+
+    private boolean causedByIOException(SolrServerException e) {
+        boolean foundIOException = false;
+        Throwable cause = e.getCause();
+        while (cause != null) {
+            if (cause instanceof IOException) {
+                foundIOException = true;
+                break;
+            }
+            cause = cause.getCause();
+        }
+        return foundIOException;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3f47207/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
index ae83b1c..2b324ec 100755
--- 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
+++ 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
@@ -33,6 +33,7 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.ListRecordSet;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
@@ -40,6 +41,8 @@ import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -56,7 +59,13 @@ import org.apache.solr.common.params.MultiMapSolrParams;
 import javax.net.ssl.SSLContext;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.math.BigInteger;
 import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -223,7 +232,7 @@ public class SolrUtils {
             
httpClient.getConnectionManager().getSchemeRegistry().register(httpsScheme);
         }
 
-        if 
(SOLR_TYPE_STANDARD.equals(context.getProperty(SOLR_TYPE).getValue())) {
+        if 
(SOLR_TYPE_STANDARD.getValue().equals(context.getProperty(SOLR_TYPE).getValue()))
 {
             return new HttpSolrClient(solrLocation, httpClient);
         } else {
             final String collection = 
context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue();
@@ -325,4 +334,133 @@ public class SolrUtils {
 
         return paramsMap;
     }
+
+    /**
+     * Writes each Record as a SolrInputDocument.
+     */
+    public static void writeRecord(final Record record, final 
SolrInputDocument inputDocument,final List<String> fieldsToIndex,String 
parentFieldName)
+            throws IOException {
+        RecordSchema schema = record.getSchema();
+
+        for (int i = 0; i < schema.getFieldCount(); i++) {
+            final RecordField field = schema.getField(i);
+            String fieldName;
+            if(!StringUtils.isBlank(parentFieldName)) {
+                // Prefixing parent field name
+                fieldName = parentFieldName+"_"+field.getFieldName();
+            }else{
+                fieldName = field.getFieldName();
+            }
+            final Object value = record.getValue(field);
+            if (value == null) {
+                continue;
+            }else {
+                final DataType dataType = 
schema.getDataType(field.getFieldName()).get();
+                writeValue(inputDocument, value, fieldName, 
dataType,fieldsToIndex);
+            }
+        }
+    }
+
+    private static void writeValue(final SolrInputDocument inputDocument, 
final Object value, final String fieldName, final DataType dataType,final 
List<String> fieldsToIndex) throws IOException {
+        final DataType chosenDataType = dataType.getFieldType() == 
RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) 
dataType) : dataType;
+        final Object coercedValue = DataTypeUtils.convertType(value, 
chosenDataType, fieldName);
+        if (coercedValue == null) {
+            return;
+        }
+
+        switch (chosenDataType.getFieldType()) {
+            case DATE: {
+                final String stringValue = 
DataTypeUtils.toString(coercedValue, () -> 
DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()));
+                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
+                    LocalDate localDate = getLocalDateFromEpochTime(fieldName, 
coercedValue);
+                    
addFieldToSolrDocument(inputDocument,fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z',fieldsToIndex);
+                } else {
+                    
addFieldToSolrDocument(inputDocument,fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z',fieldsToIndex);
+                }
+                break;
+            }
+            case TIMESTAMP: {
+                final String stringValue = 
DataTypeUtils.toString(coercedValue, () -> 
DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()));
+                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
+                    LocalDateTime localDateTime = 
getLocalDateTimeFromEpochTime(fieldName, coercedValue);
+                    
addFieldToSolrDocument(inputDocument,fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z',fieldsToIndex);
+                } else {
+                    
addFieldToSolrDocument(inputDocument,fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z',fieldsToIndex);
+                }
+                break;
+            }
+            case DOUBLE:
+                
addFieldToSolrDocument(inputDocument,fieldName,DataTypeUtils.toDouble(coercedValue,
 fieldName),fieldsToIndex);
+                break;
+            case FLOAT:
+                
addFieldToSolrDocument(inputDocument,fieldName,DataTypeUtils.toFloat(coercedValue,
 fieldName),fieldsToIndex);
+                break;
+            case LONG:
+                
addFieldToSolrDocument(inputDocument,fieldName,DataTypeUtils.toLong(coercedValue,
 fieldName),fieldsToIndex);
+                break;
+            case INT:
+            case BYTE:
+            case SHORT:
+                
addFieldToSolrDocument(inputDocument,fieldName,DataTypeUtils.toInteger(coercedValue,
 fieldName),fieldsToIndex);
+                break;
+            case CHAR:
+            case STRING:
+                
addFieldToSolrDocument(inputDocument,fieldName,coercedValue.toString(),fieldsToIndex);
+                break;
+            case BIGINT:
+                if (coercedValue instanceof Long) {
+                    addFieldToSolrDocument(inputDocument,fieldName,(Long) 
coercedValue,fieldsToIndex);
+                } else {
+                    
addFieldToSolrDocument(inputDocument,fieldName,(BigInteger)coercedValue,fieldsToIndex);
+                }
+                break;
+            case BOOLEAN:
+                final String stringValue = coercedValue.toString();
+                if ("true".equalsIgnoreCase(stringValue)) {
+                    
addFieldToSolrDocument(inputDocument,fieldName,true,fieldsToIndex);
+                } else if ("false".equalsIgnoreCase(stringValue)) {
+                    
addFieldToSolrDocument(inputDocument,fieldName,false,fieldsToIndex);
+                } else {
+                    
addFieldToSolrDocument(inputDocument,fieldName,stringValue,fieldsToIndex);
+                }
+                break;
+            case RECORD: {
+                final Record record = (Record) coercedValue;
+                writeRecord(record, inputDocument,fieldsToIndex,fieldName);
+                break;
+            }
+            case ARRAY:
+            default:
+                if (coercedValue instanceof Object[]) {
+                    final Object[] values = (Object[]) coercedValue;
+                    for(Object element : values){
+                        if(element instanceof  Record){
+                            
writeRecord((Record)element,inputDocument,fieldsToIndex,fieldName);
+                        }else{
+                            
addFieldToSolrDocument(inputDocument,fieldName,coercedValue.toString(),fieldsToIndex);
+                        }
+                    }
+                } else {
+                    
addFieldToSolrDocument(inputDocument,fieldName,coercedValue.toString(),fieldsToIndex);
+                }
+                break;
+        }
+    }
+
+    private static void addFieldToSolrDocument(SolrInputDocument 
inputDocument,String fieldName,Object fieldValue,List<String> fieldsToIndex){
+        if ((!fieldsToIndex.isEmpty() && fieldsToIndex.contains(fieldName)) || 
fieldsToIndex.isEmpty()){
+            inputDocument.addField(fieldName, fieldValue);
+        }
+    }
+
+    private static LocalDate getLocalDateFromEpochTime(String fieldName, 
Object coercedValue) {
+        Long date = DataTypeUtils.toLong(coercedValue, fieldName);
+        return 
Instant.ofEpochMilli(date).atZone(ZoneId.systemDefault()).toLocalDate();
+    }
+
+    private static LocalDateTime getLocalDateTimeFromEpochTime(String 
fieldName, Object coercedValue) {
+        Long date = DataTypeUtils.toLong(coercedValue, fieldName);
+        return 
Instant.ofEpochMilli(date).atZone(ZoneId.systemDefault()).toLocalDateTime();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3f47207/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index cc05423..e9cff2b 100644
--- 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -13,5 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 org.apache.nifi.processors.solr.PutSolrContentStream
+org.apache.nifi.processors.solr.PutSolrRecord
 org.apache.nifi.processors.solr.GetSolr
 org.apache.nifi.processors.solr.QuerySolr

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3f47207/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.PutSolrRecord/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.PutSolrRecord/additionalDetails.html
 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.PutSolrRecord/additionalDetails.html
new file mode 100644
index 0000000..12315b9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.PutSolrRecord/additionalDetails.html
@@ -0,0 +1,114 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PutSolrRecord</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css" />
+</head>
+
+<body>
+<h2>Usage Example</h2>
+<p>
+    This processor reads the NiFi record and indexes it into Solr as a 
SolrDocument.
+    Any properties added to this processor by the user are
+    passed to Solr on the update request. It is required that the input record 
reader
+    should be specified for this processor. Additionally, if only selected 
fields of a record are to be indexed
+    you can specify the field name as a comma-separated list under the fields 
property.
+</p>
+<p>
+    Example: To specify specific fields of the record  to be indexed:
+</p>
+<ul>
+    <li><strong>Fields To Index</strong>: field1,field2,field3</li>
+</ul>
+<p>
+    <strong>NOTE:</strong> In case of nested the field names should be 
prefixed with the parent field name.
+</p>
+<ul>
+    <li><strong>Fields To Index</strong>: 
parentField1,parentField2,<strong>parentField3_childField1</strong>,<strong>parentField3_childField2</strong></li>
+</ul>
+<p>
+   In case of nested records, this processor would flatten all the nested 
records into a single solr document, the field name of the field in a child 
document would follow the format of <strong>{Parent Field Name}_{Child Field 
Name}</strong>.
+</p>
+<p>
+    Example:
+    <strong>For a record created from the following json:</strong><br/>
+</p>
+<pre>
+    {
+        "first": "Abhi",
+        "last": "R",
+        "grade": 8,
+        "exams": {
+            "subject": "Maths",
+            "test" : "term1",
+            "marks" : 90
+        }
+    }
+</pre>
+<p>
+    <strong>The corresponding solr document would be represented  as 
below:</strong><br/>
+</p>
+<pre>
+    {
+        "first": "Abhi",
+        "last": "R",
+        "grade": 8,
+        "exams_subject": "Maths",
+        "exams_test" : "term1",
+        "exams_marks" : 90
+    }
+</pre>
+<p>
+    Similarly in case of an array of  nested records, this processor would 
flatten all the nested records into a single solr document, the field name of 
the field in a child document would follow the format of <strong>{Parent Field 
Name}_{Child Field Name}</strong> and would be a multivalued field in the solr 
document.
+    Example:
+    <strong>For a record created from the following json:</strong><br/>
+</p>
+<pre>
+    {
+    "first": "Abhi",
+    "last": "R",
+    "grade": 8,
+    "exams": [
+        {
+            "subject": "Maths",
+            "test" : "term1",
+            "marks" : 90
+        },
+        {
+            "subject": "Physics",
+            "test" : "term1",
+            "marks" : 95
+        }
+    ]
+    }
+</pre>
+<p>
+    <strong>The corresponding solr document would be represented  as 
below:</strong><br/>
+</p>
+<pre>
+    {
+        "first": "Abhi",
+        "last": "R",
+        "grade": 8,
+        "exams_subject": ["Maths","Physics"]
+        "exams_test" : ["term1","term1"]
+        "exams_marks" : [90,95]
+    }
+</pre>
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3f47207/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrRecord.java
 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrRecord.java
new file mode 100644
index 0000000..17801aa
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrRecord.java
@@ -0,0 +1,832 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.solr;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.StringUtils;
+import org.apache.solr.common.util.NamedList;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for PutSolrRecord Processor
+ */
+public class TestPutSolrRecord {
+
+    private static final String DEFAULT_SOLR_CORE = "testCollection";
+
+    @Test
+    public void testPutSolrOnTriggerIndex() throws IOException, 
InitializationException, SolrServerException {
+        final SolrClient solrClient = 
createEmbeddedSolrClient(DEFAULT_SOLR_CORE);
+        TestableProcessor proc = new TestableProcessor(solrClient);
+
+        TestRunner runner= createDefaultTestRunner(proc);
+        MockRecordParser recordParser = new MockRecordParser();
+        runner.addControllerService("parser", recordParser);
+
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+        runner.setProperty(PutSolrRecord.UPDATE_PATH, "/update");
+
+        recordParser.addSchemaField("id", RecordFieldType.INT);
+        recordParser.addSchemaField("first", RecordFieldType.STRING);
+        recordParser.addSchemaField("last", RecordFieldType.STRING);
+        recordParser.addSchemaField("grade", RecordFieldType.INT);
+        recordParser.addSchemaField("subject", RecordFieldType.STRING);
+        recordParser.addSchemaField("test", RecordFieldType.STRING);
+        recordParser.addSchemaField("marks", RecordFieldType.INT);
+
+        SolrDocument solrDocument = new SolrDocument();
+        solrDocument.put("id",1);
+        solrDocument.put("first","Abhinav");
+        solrDocument.put("last","R");
+        solrDocument.put("grade",8);
+        solrDocument.put("subject","Chemistry");
+        solrDocument.put("test","term1");
+        solrDocument.put("marks",98);
+
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+
+        try {
+            runner.enqueue(new byte[0], new HashMap<String, String>() {{
+                put("id", "1");
+            }});
+            runner.run(1, false);
+            verifySolrDocuments(proc.getSolrClient(), 
Collections.singletonList(solrDocument));
+            runner.assertTransferCount(PutSolrRecord.REL_FAILURE, 0);
+            runner.assertTransferCount(PutSolrRecord.REL_CONNECTION_FAILURE, 
0);
+            runner.assertTransferCount(PutSolrRecord.REL_SUCCESS, 1);
+        } finally {
+            try {
+                proc.getSolrClient().close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Test
+    public void testPutSolrOnTriggerIndexForANestedRecord() throws 
IOException, InitializationException, SolrServerException {
+        final SolrClient solrClient = 
createEmbeddedSolrClient(DEFAULT_SOLR_CORE);
+        TestableProcessor proc = new TestableProcessor(solrClient);
+
+        TestRunner runner= createDefaultTestRunner(proc);
+        MockRecordParser recordParser = new MockRecordParser();
+        runner.addControllerService("parser", recordParser);
+
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+        runner.setProperty(PutSolrRecord.UPDATE_PATH, "/update");
+
+        recordParser.addSchemaField("id", RecordFieldType.INT);
+        recordParser.addSchemaField("first", RecordFieldType.STRING);
+        recordParser.addSchemaField("last", RecordFieldType.STRING);
+        recordParser.addSchemaField("grade", RecordFieldType.INT);
+        recordParser.addSchemaField("exam", RecordFieldType.RECORD);
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("subject", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("test", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("marks", 
RecordFieldType.INT.getDataType()));
+        RecordSchema schema = new SimpleRecordSchema(fields);
+
+        Map<String,Object> values = new HashMap<>();
+        values.put("subject","Chemistry");
+        values.put("test","term1");
+        values.put("marks",98);
+        final Record record = new MapRecord(schema,values);
+
+        recordParser.addRecord(1, "Abhinav","R",8,record);
+
+
+        SolrDocument solrDocument = new SolrDocument();
+        solrDocument.put("id",1);
+        solrDocument.put("first","Abhinav");
+        solrDocument.put("last","R");
+        solrDocument.put("grade",8);
+        solrDocument.put("exam_subject","Chemistry");
+        solrDocument.put("exam_test","term1");
+        solrDocument.put("exam_marks",98);
+
+        try {
+            runner.enqueue(new byte[0], new HashMap<String, String>() {{
+                put("id", "1");
+            }});
+            runner.run(1, false);
+            runner.assertTransferCount(PutSolrRecord.REL_FAILURE, 0);
+            runner.assertTransferCount(PutSolrRecord.REL_CONNECTION_FAILURE, 
0);
+            runner.assertTransferCount(PutSolrRecord.REL_SUCCESS, 1);
+            verifySolrDocuments(proc.getSolrClient(), 
Collections.singletonList(solrDocument));
+        } finally {
+            try {
+                proc.getSolrClient().close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+
+    @Test
+    public void testRecordParserExceptionShouldRoutToFailure() throws 
IOException, InitializationException, SolrServerException {
+        final SolrClient solrClient = 
createEmbeddedSolrClient(DEFAULT_SOLR_CORE);
+        TestableProcessor proc = new TestableProcessor(solrClient);
+
+        TestRunner runner= createDefaultTestRunner(proc);
+        MockRecordParser recordParser = new MockRecordParser();
+        runner.addControllerService("parser", recordParser);
+
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+        runner.setProperty(PutSolrRecord.UPDATE_PATH, "/update");
+
+        recordParser.addSchemaField("id", RecordFieldType.INT);
+        recordParser.addSchemaField("first", RecordFieldType.STRING);
+        recordParser.addSchemaField("last", RecordFieldType.STRING);
+        recordParser.addSchemaField("grade", RecordFieldType.INT);
+
+        recordParser.failAfter(0);
+
+        try {
+            runner.enqueue(new byte[0], new HashMap<String, String>() {{
+                put("id", "1");
+            }});
+            runner.run(1, false);
+            runner.assertTransferCount(PutSolrRecord.REL_FAILURE, 1);
+            runner.assertTransferCount(PutSolrRecord.REL_CONNECTION_FAILURE, 
0);
+            runner.assertTransferCount(PutSolrRecord.REL_SUCCESS, 0);
+            verifySolrDocuments(proc.getSolrClient(),Collections.EMPTY_LIST);
+        } finally {
+            try {
+                proc.getSolrClient().close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Test
+    public void testPutSolrOnTriggerIndexForAnArrayOfNestedRecord() throws 
IOException, InitializationException, SolrServerException {
+        final SolrClient solrClient = 
createEmbeddedSolrClient(DEFAULT_SOLR_CORE);
+        TestableProcessor proc = new TestableProcessor(solrClient);
+
+        TestRunner runner= createDefaultTestRunner(proc);
+        MockRecordParser recordParser = new MockRecordParser();
+        runner.addControllerService("parser", recordParser);
+
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+        runner.setProperty(PutSolrRecord.UPDATE_PATH, "/update");
+
+        recordParser.addSchemaField("id", RecordFieldType.INT);
+        recordParser.addSchemaField("first", RecordFieldType.STRING);
+        recordParser.addSchemaField("last", RecordFieldType.STRING);
+        recordParser.addSchemaField("grade", RecordFieldType.INT);
+        recordParser.addSchemaField("exams", RecordFieldType.ARRAY);
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("subject", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("test", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("marks", 
RecordFieldType.INT.getDataType()));
+        RecordSchema schema = new SimpleRecordSchema(fields);
+
+        Map<String,Object> values1 = new HashMap<>();
+        values1.put("subject","Chemistry");
+        values1.put("test","term1");
+        values1.put("marks",98);
+        final Record record1 = new MapRecord(schema,values1);
+
+        Map<String,Object> values2 = new HashMap<>();
+        values2.put("subject","Maths");
+        values2.put("test","term1");
+        values2.put("marks",98);
+        final Record record2 = new MapRecord(schema,values2);
+
+        recordParser.addRecord(1, "Abhinav","R",8,new 
Record[]{record1,record2});
+
+        SolrDocument solrDocument = new SolrDocument();
+        solrDocument.put("id",1);
+        solrDocument.put("first","Abhinav");
+        solrDocument.put("last","R");
+        solrDocument.put("grade",8);
+        solrDocument.put("exams_subject", 
Stream.of("Chemistry","Maths").collect(Collectors.toList()));
+        
solrDocument.put("exams_test",Stream.of("term1","term1").collect(Collectors.toList()));
+        
solrDocument.put("exams_marks",Stream.of(98,98).collect(Collectors.toList()));
+
+        try {
+            runner.enqueue(new byte[0], new HashMap<String, String>() {{
+                put("id", "1");
+            }});
+            runner.run(1, false);
+            
verifySolrDocuments(solrClient,Collections.singletonList(solrDocument));
+            runner.assertTransferCount(PutSolrRecord.REL_FAILURE, 0);
+            runner.assertTransferCount(PutSolrRecord.REL_CONNECTION_FAILURE, 
0);
+            runner.assertTransferCount(PutSolrRecord.REL_SUCCESS, 1);
+
+        }catch (Exception e){
+            e.printStackTrace();
+        }finally {
+            try {
+                proc.getSolrClient().close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Test
+    public void testCollectionExpressionLanguage() throws IOException, 
SolrServerException, InitializationException {
+        final String collection = "collection1";
+        final CollectionVerifyingProcessor proc = new 
CollectionVerifyingProcessor(collection);
+
+        TestRunner runner = TestRunners.newTestRunner(proc);
+
+        MockRecordParser recordParser = new MockRecordParser();
+        runner.addControllerService("parser", recordParser);
+
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+        recordParser.addSchemaField("id", RecordFieldType.INT);
+        recordParser.addSchemaField("first", RecordFieldType.STRING);
+        recordParser.addSchemaField("last", RecordFieldType.STRING);
+        recordParser.addSchemaField("grade", RecordFieldType.INT);
+        recordParser.addSchemaField("subject", RecordFieldType.STRING);
+        recordParser.addSchemaField("test", RecordFieldType.STRING);
+        recordParser.addSchemaField("marks", RecordFieldType.INT);
+
+
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+
+        runner.setProperty(SolrUtils.SOLR_TYPE, 
SolrUtils.SOLR_TYPE_CLOUD.getValue());
+        runner.setProperty(SolrUtils.SOLR_LOCATION, "localhost:9983");
+        runner.setProperty(SolrUtils.COLLECTION, "${solr.collection}");
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put("solr.collection", collection);
+        attributes.put("id","1");
+        try {
+            runner.enqueue(new byte[0], attributes);
+            runner.run(1, false);
+            runner.assertTransferCount(PutSolrRecord.REL_FAILURE, 0);
+            runner.assertTransferCount(PutSolrRecord.REL_CONNECTION_FAILURE, 
0);
+            runner.assertTransferCount(PutSolrRecord.REL_SUCCESS, 1);
+
+        } finally {
+            try {
+                proc.getSolrClient().close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Test
+    public void testSolrServerExceptionShouldRouteToFailure() throws 
IOException, SolrServerException, InitializationException {
+        final Throwable throwable = new SolrServerException("Invalid 
Document");
+        final ExceptionThrowingProcessor proc = new 
ExceptionThrowingProcessor(throwable);
+
+        final TestRunner runner = createDefaultTestRunner(proc);
+        runner.setProperty(PutSolrRecord.UPDATE_PATH, "/update");
+
+        MockRecordParser recordParser = new MockRecordParser();
+
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+        runner.addControllerService("parser", recordParser);
+
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+
+        try {
+            runner.enqueue(new byte[0], new HashMap<String, String>() {{
+                put("id", "1");
+            }});
+            runner.run(1,false);
+
+            runner.assertAllFlowFilesTransferred(PutSolrRecord.REL_FAILURE, 1);
+            verify(proc.getSolrClient(), 
times(1)).request(any(SolrRequest.class), eq((String)null));
+        }finally {
+            try {
+                proc.getSolrClient().close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Test
+    public void 
testSolrServerExceptionCausedByIOExceptionShouldRouteToConnectionFailure() 
throws IOException, SolrServerException, InitializationException {
+        final Throwable throwable = new SolrServerException(new 
IOException("Error communicating with Solr"));
+        final ExceptionThrowingProcessor proc = new 
ExceptionThrowingProcessor(throwable);
+
+        final TestRunner runner = createDefaultTestRunner(proc);
+
+        runner.setProperty(PutSolrRecord.UPDATE_PATH, "/update");
+
+        MockRecordParser recordParser = new MockRecordParser();
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+        runner.addControllerService("parser", recordParser);
+
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+        try {
+            runner.enqueue(new byte[0], new HashMap<String, String>() {{
+                put("id", "1");
+            }});
+            runner.run();
+
+            
runner.assertAllFlowFilesTransferred(PutSolrRecord.REL_CONNECTION_FAILURE, 1);
+            verify(proc.getSolrClient(), 
times(1)).request(any(SolrRequest.class), eq((String)null));
+        }finally {
+            try {
+                proc.getSolrClient().close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Test
+    public void testSolrExceptionShouldRouteToFailure() throws IOException, 
SolrServerException, InitializationException {
+        final Throwable throwable = new 
SolrException(SolrException.ErrorCode.BAD_REQUEST, "Error");
+        final ExceptionThrowingProcessor proc = new 
ExceptionThrowingProcessor(throwable);
+
+        final TestRunner runner = createDefaultTestRunner(proc);
+        runner.setProperty(PutSolrRecord.UPDATE_PATH, "/update");
+
+        MockRecordParser recordParser = new MockRecordParser();
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+        runner.addControllerService("parser", recordParser);
+
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+        try {
+            runner.enqueue(new byte[0], new HashMap<String, String>() {{
+                put("id", "1");
+            }});
+            runner.run();
+
+            runner.assertAllFlowFilesTransferred(PutSolrRecord.REL_FAILURE, 1);
+            verify(proc.getSolrClient(), 
times(1)).request(any(SolrRequest.class), eq((String)null));
+        }finally {
+            try {
+                proc.getSolrClient().close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Test
+    public void testRemoteSolrExceptionShouldRouteToFailure() throws 
IOException, SolrServerException, InitializationException {
+        final Throwable throwable = new HttpSolrClient.RemoteSolrException(
+                "host", 401, "error", new NumberFormatException());
+        final ExceptionThrowingProcessor proc = new 
ExceptionThrowingProcessor(throwable);
+
+        final TestRunner runner = createDefaultTestRunner(proc);
+        runner.setProperty(PutSolrRecord.UPDATE_PATH, "/update");
+
+        MockRecordParser recordParser = new MockRecordParser();
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+        runner.addControllerService("parser", recordParser);
+
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+        try {
+            runner.enqueue(new byte[0], new HashMap<String, String>() {{
+                put("id", "1");
+            }});
+            runner.run();
+
+            runner.assertAllFlowFilesTransferred(PutSolrRecord.REL_FAILURE, 1);
+            verify(proc.getSolrClient(), 
times(1)).request(any(SolrRequest.class), eq((String)null));
+        }finally {
+            try {
+                proc.getSolrClient().close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Test
+    public void testIOExceptionShouldRouteToConnectionFailure() throws 
IOException, SolrServerException, InitializationException {
+        final Throwable throwable = new IOException("Error communicating with 
Solr");
+        final ExceptionThrowingProcessor proc = new 
ExceptionThrowingProcessor(throwable);
+
+        final TestRunner runner = createDefaultTestRunner(proc);
+        runner.setProperty(PutSolrRecord.UPDATE_PATH, "/update");
+
+        MockRecordParser recordParser = new MockRecordParser();
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+        runner.addControllerService("parser", recordParser);
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+        try {
+            runner.enqueue(new byte[0], new HashMap<String, String>() {{
+                put("id", "1");
+            }});
+            runner.run();
+
+            
runner.assertAllFlowFilesTransferred(PutSolrRecord.REL_CONNECTION_FAILURE, 1);
+            verify(proc.getSolrClient(), 
times(1)).request(any(SolrRequest.class), eq((String)null));
+        }finally {
+            try {
+                proc.getSolrClient().close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Test
+    public void testSolrTypeCloudShouldRequireCollection() throws 
InitializationException {
+        final TestRunner runner = 
TestRunners.newTestRunner(PutSolrRecord.class);
+        MockRecordParser recordParser = new MockRecordParser();
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+        runner.addControllerService("parser", recordParser);
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+        runner.setProperty(SolrUtils.SOLR_TYPE, 
SolrUtils.SOLR_TYPE_CLOUD.getValue());
+        runner.setProperty(SolrUtils.SOLR_LOCATION, 
"http://localhost:8443/solr";);
+        runner.assertNotValid();
+
+        runner.setProperty(SolrUtils.COLLECTION, "someCollection1");
+        runner.assertValid();
+    }
+
+
+    @Test
+    public void testSolrTypeStandardShouldNotRequireCollection() throws 
InitializationException {
+        final TestRunner runner = 
TestRunners.newTestRunner(PutSolrRecord.class);
+        MockRecordParser recordParser = new MockRecordParser();
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+        runner.addControllerService("parser", recordParser);
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+        runner.setProperty(SolrUtils.SOLR_TYPE, 
SolrUtils.SOLR_TYPE_STANDARD.getValue());
+        runner.setProperty(SolrUtils.SOLR_LOCATION, 
"http://localhost:8443/solr";);
+        runner.assertValid();
+    }
+
+    @Test
+    public void testHttpsUrlShouldRequireSSLContext() throws 
InitializationException {
+        final TestRunner runner = 
TestRunners.newTestRunner(PutSolrRecord.class);
+        MockRecordParser recordParser = new MockRecordParser();
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+        runner.addControllerService("parser", recordParser);
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+        runner.setProperty(SolrUtils.SOLR_TYPE, 
SolrUtils.SOLR_TYPE_STANDARD.getValue());
+        runner.setProperty(SolrUtils.SOLR_LOCATION, 
"https://localhost:8443/solr";);
+        runner.assertNotValid();
+
+        final SSLContextService sslContextService = new 
MockSSLContextService();
+        runner.addControllerService("ssl-context", sslContextService);
+        runner.enableControllerService(sslContextService);
+
+        runner.setProperty(SolrUtils.SSL_CONTEXT_SERVICE, "ssl-context");
+        runner.assertValid();
+    }
+
+    @Test
+    public void testHttpUrlShouldNotAllowSSLContext() throws 
InitializationException {
+        final TestRunner runner = 
TestRunners.newTestRunner(PutSolrRecord.class);
+        MockRecordParser recordParser = new MockRecordParser();
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+        runner.addControllerService("parser", recordParser);
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+        runner.setProperty(SolrUtils.SOLR_TYPE, 
SolrUtils.SOLR_TYPE_STANDARD.getValue());
+        runner.setProperty(SolrUtils.SOLR_LOCATION, 
"http://localhost:8443/solr";);
+        runner.assertValid();
+
+        final SSLContextService sslContextService = new 
MockSSLContextService();
+        runner.addControllerService("ssl-context", sslContextService);
+        runner.enableControllerService(sslContextService);
+
+        runner.setProperty(SolrUtils.SSL_CONTEXT_SERVICE, "ssl-context");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testUsernamePasswordValidation() throws 
InitializationException {
+        final TestRunner runner = 
TestRunners.newTestRunner(PutSolrRecord.class);
+        MockRecordParser recordParser = new MockRecordParser();
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+        runner.addControllerService("parser", recordParser);
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+        runner.setProperty(SolrUtils.SOLR_TYPE, 
SolrUtils.SOLR_TYPE_STANDARD.getValue());
+        runner.setProperty(SolrUtils.SOLR_LOCATION, 
"http://localhost:8443/solr";);
+        runner.assertValid();
+
+        runner.setProperty(SolrUtils.BASIC_USERNAME, "user1");
+        runner.assertNotValid();
+
+        runner.setProperty(SolrUtils.BASIC_PASSWORD, "password");
+        runner.assertValid();
+
+        runner.setProperty(SolrUtils.BASIC_USERNAME, "");
+        runner.assertNotValid();
+
+        runner.setProperty(SolrUtils.BASIC_USERNAME, "${solr.user}");
+        runner.assertNotValid();
+
+        runner.setVariable("solr.user", "solrRocks");
+        runner.assertValid();
+
+        runner.setProperty(SolrUtils.BASIC_PASSWORD, "${solr.password}");
+        runner.assertNotValid();
+
+        runner.setVariable("solr.password", "solrRocksPassword");
+        runner.assertValid();
+    }
+
+    @Test
+    public void testJAASClientAppNameValidation() throws 
InitializationException {
+        final TestRunner runner = 
TestRunners.newTestRunner(PutSolrRecord.class);
+        MockRecordParser recordParser = new MockRecordParser();
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+        runner.addControllerService("parser", recordParser);
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+        runner.setProperty(SolrUtils.SOLR_TYPE, 
SolrUtils.SOLR_TYPE_STANDARD.getValue());
+        runner.setProperty(SolrUtils.SOLR_LOCATION, 
"http://localhost:8443/solr";);
+        runner.assertValid();
+
+        // clear the jaas config system property if it was set
+        final String jaasConfig = 
System.getProperty(Krb5HttpClientConfigurer.LOGIN_CONFIG_PROP);
+        if (!StringUtils.isEmpty(jaasConfig)) {
+            System.clearProperty(Krb5HttpClientConfigurer.LOGIN_CONFIG_PROP);
+        }
+
+        // should be invalid if we have a client name but not config file
+        runner.setProperty(SolrUtils.JAAS_CLIENT_APP_NAME, "Client");
+        runner.assertNotValid();
+
+        // should be invalid if we have a client name that is not in the 
config file
+        final File jaasConfigFile = new 
File("src/test/resources/jaas-client.conf");
+        System.setProperty(Krb5HttpClientConfigurer.LOGIN_CONFIG_PROP, 
jaasConfigFile.getAbsolutePath());
+        runner.assertNotValid();
+
+        // should be valid now that the name matches up with the config file
+        runner.setProperty(SolrUtils.JAAS_CLIENT_APP_NAME, "SolrJClient");
+        runner.assertValid();
+    }
+
+
+    /**
+     * Creates a base TestRunner with Solr Type of standard.
+     */
+    private static TestRunner createDefaultTestRunner(PutSolrRecord processor) 
{
+        TestRunner runner = TestRunners.newTestRunner(processor);
+        runner.setProperty(SolrUtils.SOLR_TYPE, 
SolrUtils.SOLR_TYPE_STANDARD.getValue());
+        runner.setProperty(SolrUtils.SOLR_LOCATION, 
"http://localhost:8443/solr";);
+        return runner;
+    }
+
+    // Override createSolrClient and return the passed in SolrClient
+    private class TestableProcessor extends PutSolrRecord {
+        private SolrClient solrClient;
+
+        public TestableProcessor(SolrClient solrClient) {
+            this.solrClient = solrClient;
+        }
+        @Override
+        protected SolrClient createSolrClient(ProcessContext context, String 
solrLocation) {
+            return solrClient;
+        }
+    }
+
+    // Create an EmbeddedSolrClient with the given core name.
+    private static SolrClient createEmbeddedSolrClient(String coreName) throws 
IOException {
+        String relPath = TestPutSolrRecord.class.getProtectionDomain()
+                .getCodeSource().getLocation().getFile()
+                + "../../target";
+
+        return EmbeddedSolrServerFactory.create(
+                EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME,
+                coreName, relPath);
+    }
+
+    // Override the createSolrClient method to inject a custom SolrClient.
+    private class CollectionVerifyingProcessor extends PutSolrRecord {
+
+        private SolrClient mockSolrClient;
+
+        private final String expectedCollection;
+
+        public CollectionVerifyingProcessor(final String expectedCollection) {
+            this.expectedCollection = expectedCollection;
+        }
+
+        @Override
+        protected SolrClient createSolrClient(ProcessContext context, String 
solrLocation) {
+            mockSolrClient = new SolrClient() {
+                @Override
+                public NamedList<Object> request(SolrRequest solrRequest, 
String s) throws SolrServerException, IOException {
+                    Assert.assertEquals(expectedCollection, 
solrRequest.getParams().get(PutSolrRecord.COLLECTION_PARAM_NAME));
+                    return new NamedList<>();
+                }
+
+                @Override
+                public void close() {
+
+                }
+
+            };
+            return mockSolrClient;
+        }
+
+    }
+
+    /**
+     * Verify that given SolrServer contains the expected SolrDocuments.
+     */
+    private static void verifySolrDocuments(SolrClient solrServer, 
Collection<SolrDocument> expectedDocuments)
+            throws IOException, SolrServerException {
+
+        solrServer.commit();
+
+        SolrQuery query = new SolrQuery("*:*");
+        QueryResponse qResponse = solrServer.query(query);
+        Assert.assertEquals(expectedDocuments.size(), 
qResponse.getResults().getNumFound());
+
+        // verify documents have expected fields and values
+        for (SolrDocument expectedDoc : expectedDocuments) {
+            boolean found = false;
+            for (SolrDocument solrDocument : qResponse.getResults()) {
+                boolean foundAllFields = true;
+                for (String expectedField : expectedDoc.getFieldNames()) {
+                    Object expectedVal = 
expectedDoc.getFirstValue(expectedField);
+                    Object actualVal = 
solrDocument.getFirstValue(expectedField);
+                    foundAllFields = expectedVal.equals(actualVal);
+                }
+
+                if (foundAllFields) {
+                    found = true;
+                    break;
+                }
+            }
+            Assert.assertTrue("Could not find " + expectedDoc, found);
+        }
+    }
+
+    // Override the createSolrClient method to inject a Mock.
+    private class ExceptionThrowingProcessor extends PutSolrRecord {
+
+        private SolrClient mockSolrClient;
+        private Throwable throwable;
+
+        public ExceptionThrowingProcessor(Throwable throwable) {
+            this.throwable = throwable;
+        }
+
+        @Override
+        protected SolrClient createSolrClient(ProcessContext context, String 
solrLocation) {
+            mockSolrClient = Mockito.mock(SolrClient.class);
+            try {
+                when(mockSolrClient.request(any(SolrRequest.class),
+                        eq((String)null))).thenThrow(throwable);
+            } catch (SolrServerException|IOException e) {
+                Assert.fail(e.getMessage());
+            }
+            return mockSolrClient;
+        }
+
+    }
+
+
+    /**
+     * Mock implementation so we don't need to have a real keystore/truststore 
available for testing.
+     */
+    private class MockSSLContextService extends AbstractControllerService 
implements SSLContextService {
+
+        @Override
+        public SSLContext createSSLContext(ClientAuth clientAuth) throws 
ProcessException {
+            return null;
+        }
+
+        @Override
+        public String getTrustStoreFile() {
+            return null;
+        }
+
+        @Override
+        public String getTrustStoreType() {
+            return null;
+        }
+
+        @Override
+        public String getTrustStorePassword() {
+            return null;
+        }
+
+        @Override
+        public boolean isTrustStoreConfigured() {
+            return false;
+        }
+
+        @Override
+        public String getKeyStoreFile() {
+            return null;
+        }
+
+        @Override
+        public String getKeyStoreType() {
+            return null;
+        }
+
+        @Override
+        public String getKeyStorePassword() {
+            return null;
+        }
+
+        @Override
+        public String getKeyPassword() {
+            return null;
+        }
+
+        @Override
+        public boolean isKeyStoreConfigured() {
+            return false;
+        }
+
+        @Override
+        public String getSslAlgorithm() {
+            return null;
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3f47207/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/testCollection/conf/schema.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/testCollection/conf/schema.xml
 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/testCollection/conf/schema.xml
index 58b9256..aae2ae5 100644
--- 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/testCollection/conf/schema.xml
+++ 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/testCollection/conf/schema.xml
@@ -1,31 +1,37 @@
-<?xml version="1.0" encoding="UTF-8" ?>
-<schema version="1.5" name="testCollection">
-
-    <fieldType name="string" class="solr.StrField"/>
-    <fieldType name="date" class="solr.TrieDateField" precisionStep="0" 
positionIncrementGap="0"/>
-    <fieldType name="int" class="solr.TrieIntField" precisionStep="0" 
positionIncrementGap="0"/>
-    <fieldType name="float" class="solr.TrieFloatField" precisionStep="0" 
positionIncrementGap="0"/>
-    <fieldType name="long" class="solr.TrieLongField" precisionStep="0" 
positionIncrementGap="0"/>
-    <fieldType name="double" class="solr.TrieDoubleField" precisionStep="0" 
positionIncrementGap="0"/>
-
-    <field name="_version_" type="long" indexed="true" stored="true"/>
-
-    <field name="first" type="string" indexed="true" stored="true" />
-    <field name="last" type="string" indexed="true" stored="true" />
-    <field name="grade" type="int" indexed="true" stored="true" />
-    <field name="marks" type="int" indexed="true" stored="true" />
-    <field name="test" type="string" indexed="true" stored="true" />
-    <field name="subject" type="string" indexed="true" stored="true" />
-
-    <field name="created" type="date" indexed="true" stored="true" />
-
-    <field name="id" type="string" indexed="true" stored="true" />
-    <field name="double_single" type="double" indexed="true" stored="true" />
-    <field name="integer_single" type="int" indexed="true" stored="true" />
-    <field name="integer_multi" type="int" indexed="true" stored="true"  
multiValued="true"/>
-    <field name="string_single" type="string" indexed="true" stored="true" />
-    <field name="string_multi" type="string" indexed="true" stored="true" 
multiValued="true"/>
-
-    <uniqueKey>id</uniqueKey>
-
+<?xml version="1.0" encoding="UTF-8" ?>
+<schema version="1.5" name="testCollection">
+
+    <fieldType name="string" class="solr.StrField"/>
+    <fieldType name="date" class="solr.TrieDateField" precisionStep="0" 
positionIncrementGap="0"/>
+    <fieldType name="int" class="solr.TrieIntField" precisionStep="0" 
positionIncrementGap="0"/>
+    <fieldType name="float" class="solr.TrieFloatField" precisionStep="0" 
positionIncrementGap="0"/>
+    <fieldType name="long" class="solr.TrieLongField" precisionStep="0" 
positionIncrementGap="0"/>
+    <fieldType name="double" class="solr.TrieDoubleField" precisionStep="0" 
positionIncrementGap="0"/>
+
+    <field name="_version_" type="long" indexed="true" stored="true"/>
+
+    <field name="first" type="string" indexed="true" stored="true" />
+    <field name="last" type="string" indexed="true" stored="true" />
+    <field name="grade" type="int" indexed="true" stored="true" />
+    <field name="marks" type="int" indexed="true" stored="true" />
+    <field name="test" type="string" indexed="true" stored="true" />
+    <field name="subject" type="string" indexed="true" stored="true" />
+    <field name="exam_marks" type="int" indexed="true" stored="true"/>
+    <field name="exam_test" type="string" indexed="true" stored="true"/>
+    <field name="exam_subject" type="string" indexed="true" stored="true"/>
+    <field name="exams_marks" type="int" indexed="true" stored="true" 
multiValued="true"/>
+    <field name="exams_test" type="string" indexed="true" stored="true" 
multiValued="true"/>
+    <field name="exams_subject" type="string" indexed="true" stored="true" 
multiValued="true"/>
+
+    <field name="created" type="date" indexed="true" stored="true" />
+
+    <field name="id" type="string" indexed="true" stored="true" />
+    <field name="double_single" type="double" indexed="true" stored="true" />
+    <field name="integer_single" type="int" indexed="true" stored="true" />
+    <field name="integer_multi" type="int" indexed="true" stored="true"  
multiValued="true"/>
+    <field name="string_single" type="string" indexed="true" stored="true" />
+    <field name="string_multi" type="string" indexed="true" stored="true" 
multiValued="true"/>
+
+    <uniqueKey>id</uniqueKey>
+
 </schema>
\ No newline at end of file

Reply via email to