[
https://issues.apache.org/jira/browse/NIFI-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16406657#comment-16406657
]
ASF GitHub Bot commented on NIFI-4035:
--------------------------------------
Github user abhinavrohatgi30 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2561#discussion_r175839474
--- Diff:
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
---
@@ -280,5 +291,115 @@ public SolrInputDocument
toSolrInputDocument(SolrDocument d) {
}
}
+ /**
+ * Writes each Record as a SolrInputDocument.
+ */
+ public static void writeRecord(final Record record, final RecordSchema
writeSchema, final SolrInputDocument inputDocument,final List<String>
fieldsToIndex)
+ throws IOException {
+ RecordSchema schema = record.getSchema();
+
+ for (int i = 0; i < schema.getFieldCount(); i++) {
+ final RecordField field = schema.getField(i);
+ final String fieldName = field.getFieldName();
+ final Object value = record.getValue(field);
+ if (value == null || (!fieldsToIndex.isEmpty() &&
!fieldsToIndex.contains(fieldName))) {
+ continue;
+ }else {
+ final DataType dataType =
schema.getDataType(fieldName).get();
+ writeValue(inputDocument, value, fieldName,
dataType,fieldsToIndex);
+ }
+ }
+ }
+ private static void writeValue(final SolrInputDocument inputDocument,
final Object value, final String fieldName, final DataType dataType,final
List<String> fieldsToIndex) throws IOException {
+ final DataType chosenDataType = dataType.getFieldType() ==
RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType)
dataType) : dataType;
+ final Object coercedValue = DataTypeUtils.convertType(value,
chosenDataType, fieldName);
+ if (coercedValue == null) {
+ return;
+ }
+
+ switch (chosenDataType.getFieldType()) {
+ case DATE: {
+ final String stringValue =
DataTypeUtils.toString(coercedValue, () ->
DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()));
+ if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
+ LocalDate localDate =
getLocalDateFromEpochTime(fieldName, coercedValue);
+
inputDocument.addField(fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
+ } else {
+
inputDocument.addField(fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
+ }
+ break;
+ }
+ case TIMESTAMP: {
+ final String stringValue =
DataTypeUtils.toString(coercedValue, () ->
DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()));
+ if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
+ LocalDateTime localDateTime =
getLocalDateTimeFromEpochTime(fieldName, coercedValue);
+
inputDocument.addField(fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
+ } else {
+
inputDocument.addField(fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
+ }
+ break;
+ }
+ case DOUBLE:
+
inputDocument.addField(fieldName,DataTypeUtils.toDouble(coercedValue,
fieldName));
+ break;
+ case FLOAT:
+
inputDocument.addField(fieldName,DataTypeUtils.toFloat(coercedValue,
fieldName));
+ break;
+ case LONG:
+
inputDocument.addField(fieldName,DataTypeUtils.toLong(coercedValue, fieldName));
+ break;
+ case INT:
+ case BYTE:
+ case SHORT:
+
inputDocument.addField(fieldName,DataTypeUtils.toInteger(coercedValue,
fieldName));
+ break;
+ case CHAR:
+ case STRING:
+ inputDocument.addField(fieldName,coercedValue.toString());
+ break;
+ case BIGINT:
+ if (coercedValue instanceof Long) {
+ inputDocument.addField(fieldName,(Long) coercedValue);
+ } else {
+ inputDocument.addField(fieldName,(BigInteger)
coercedValue);
+ }
+ break;
+ case BOOLEAN:
+ final String stringValue = coercedValue.toString();
+ if ("true".equalsIgnoreCase(stringValue)) {
+ inputDocument.addField(fieldName,true);
+ } else if ("false".equalsIgnoreCase(stringValue)) {
+ inputDocument.addField(fieldName,false);
+ } else {
+ inputDocument.addField(fieldName,stringValue);
+ }
+ break;
+ case RECORD: {
--- End diff --
With the current code, it writes a single solr document for a record and
flattens all the nested records in that single solr document.
So if there is an array of nested records it would create multiple fields
with the same key in the solr document which would eventually mean that the
field would be indexed as multivalued in solr with the assumption that the
schema has defined the field to be multivalued else it would fail to index.
> Implement record-based Solr processors
> --------------------------------------
>
> Key: NIFI-4035
> URL: https://issues.apache.org/jira/browse/NIFI-4035
> Project: Apache NiFi
> Issue Type: Improvement
> Affects Versions: 1.2.0, 1.3.0
> Reporter: Bryan Bende
> Priority: Minor
>
> Now that we have record readers and writers, we should implement variants of
> the existing Solr processors that record-based...
> Processors to consider:
> * PutSolrRecord - uses a configured record reader to read an incoming flow
> file and insert records to Solr
> * GetSolrRecord - extracts records from Solr and uses a configured record
> writer to write them to a flow file
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)