[ 
https://issues.apache.org/jira/browse/NIFI-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16406498#comment-16406498
 ] 

ASF GitHub Bot commented on NIFI-4035:
--------------------------------------

Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175809550
  
    --- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
 ---
    @@ -280,5 +291,115 @@ public SolrInputDocument 
toSolrInputDocument(SolrDocument d) {
             }
         }
     
    +    /**
    +     * Writes each Record as a SolrInputDocument.
    +     */
    +    public static void writeRecord(final Record record, final RecordSchema 
writeSchema, final SolrInputDocument inputDocument,final List<String> 
fieldsToIndex)
    +            throws IOException {
    +        RecordSchema schema = record.getSchema();
    +
    +        for (int i = 0; i < schema.getFieldCount(); i++) {
    +            final RecordField field = schema.getField(i);
    +            final String fieldName = field.getFieldName();
    +            final Object value = record.getValue(field);
    +            if (value == null || (!fieldsToIndex.isEmpty() && 
!fieldsToIndex.contains(fieldName))) {
    +                continue;
    +            }else {
    +                final DataType dataType = 
schema.getDataType(fieldName).get();
    +                writeValue(inputDocument, value, fieldName, 
dataType,fieldsToIndex);
    +            }
    +        }
    +    }
     
    +    private static void writeValue(final SolrInputDocument inputDocument, 
final Object value, final String fieldName, final DataType dataType,final 
List<String> fieldsToIndex) throws IOException {
    +        final DataType chosenDataType = dataType.getFieldType() == 
RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) 
dataType) : dataType;
    +        final Object coercedValue = DataTypeUtils.convertType(value, 
chosenDataType, fieldName);
    +        if (coercedValue == null) {
    +            return;
    +        }
    +
    +        switch (chosenDataType.getFieldType()) {
    +            case DATE: {
    +                final String stringValue = 
DataTypeUtils.toString(coercedValue, () -> 
DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDate localDate = 
getLocalDateFromEpochTime(fieldName, coercedValue);
    +                    
inputDocument.addField(fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    
inputDocument.addField(fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case TIMESTAMP: {
    +                final String stringValue = 
DataTypeUtils.toString(coercedValue, () -> 
DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDateTime localDateTime = 
getLocalDateTimeFromEpochTime(fieldName, coercedValue);
    +                    
inputDocument.addField(fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    
inputDocument.addField(fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case DOUBLE:
    +                
inputDocument.addField(fieldName,DataTypeUtils.toDouble(coercedValue, 
fieldName));
    +                break;
    +            case FLOAT:
    +                
inputDocument.addField(fieldName,DataTypeUtils.toFloat(coercedValue, 
fieldName));
    +                break;
    +            case LONG:
    +                
inputDocument.addField(fieldName,DataTypeUtils.toLong(coercedValue, fieldName));
    +                break;
    +            case INT:
    +            case BYTE:
    +            case SHORT:
    +                
inputDocument.addField(fieldName,DataTypeUtils.toInteger(coercedValue, 
fieldName));
    +                break;
    +            case CHAR:
    +            case STRING:
    +                inputDocument.addField(fieldName,coercedValue.toString());
    +                break;
    +            case BIGINT:
    +                if (coercedValue instanceof Long) {
    +                    inputDocument.addField(fieldName,(Long) coercedValue);
    +                } else {
    +                    inputDocument.addField(fieldName,(BigInteger) 
coercedValue);
    +                }
    +                break;
    +            case BOOLEAN:
    +                final String stringValue = coercedValue.toString();
    +                if ("true".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,true);
    +                } else if ("false".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,false);
    +                } else {
    +                    inputDocument.addField(fieldName,stringValue);
    +                }
    +                break;
    +            case RECORD: {
    --- End diff --
    
    I think we have to handle it since someone can specify a field name in 
"fields to index" that could be of type record.
    
    I think it makes sense to have a property like "Nested Field Names" with 
choices for "Fully Qualified" and "Child Only" (or something like that).
    
    This lines up with how Solr's JSON update works:
    
    
https://lucene.apache.org/solr/guide/6_6/transforming-and-indexing-custom-json.html#transforming-and-indexing-custom-json
    
    The part that shows....
    
    The default behavior is to use the fully qualified name (FQN) of the node. 
So, if we don’t define any field mappings, like this:
    
    curl 
'http://localhost:8983/solr/my_collection/update/json/docs?split=/exams'\
        -H 'Content-type:application/json' -d '
    {
      "first": "John",
      "last": "Doe",
      "grade": 8,
      "exams": [
        {
          "subject": "Maths",
          "test"   : "term1",
          "marks"  : 90},
        {
          "subject": "Biology",
          "test"   : "term1",
          "marks"  : 86}
      ]
    }'
    
    The indexed documents would be added to the index with fields that look 
like this:
    
    {
      "first":"John",
      "last":"Doe",
      "grade":8,
      "exams.subject":"Maths",
      "exams.test":"term1",
      "exams.marks":90},
    {
      "first":"John",
      "last":"Doe",
      "grade":8,
      "exams.subject":"Biology",
      "exams.test":"term1",
      "exams.marks":86}
    



> Implement record-based Solr processors
> --------------------------------------
>
>                 Key: NIFI-4035
>                 URL: https://issues.apache.org/jira/browse/NIFI-4035
>             Project: Apache NiFi
>          Issue Type: Improvement
>    Affects Versions: 1.2.0, 1.3.0
>            Reporter: Bryan Bende
>            Priority: Minor
>
> Now that we have record readers and writers, we should implement variants of 
> the existing Solr processors that record-based...
> Processors to consider:
> * PutSolrRecord - uses a configured record reader to read an incoming flow 
> file and insert records to Solr
> * GetSolrRecord - extracts records from Solr and uses a configured record 
> writer to write them to a flow file



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to