Github user bbende commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2561#discussion_r175809550
--- Diff:
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
---
@@ -280,5 +291,115 @@ public SolrInputDocument
toSolrInputDocument(SolrDocument d) {
}
}
+ /**
+ * Writes each Record as a SolrInputDocument.
+ */
+ public static void writeRecord(final Record record, final RecordSchema
writeSchema, final SolrInputDocument inputDocument,final List<String>
fieldsToIndex)
+ throws IOException {
+ RecordSchema schema = record.getSchema();
+
+ for (int i = 0; i < schema.getFieldCount(); i++) {
+ final RecordField field = schema.getField(i);
+ final String fieldName = field.getFieldName();
+ final Object value = record.getValue(field);
+ if (value == null || (!fieldsToIndex.isEmpty() &&
!fieldsToIndex.contains(fieldName))) {
+ continue;
+ }else {
+ final DataType dataType =
schema.getDataType(fieldName).get();
+ writeValue(inputDocument, value, fieldName,
dataType,fieldsToIndex);
+ }
+ }
+ }
+ private static void writeValue(final SolrInputDocument inputDocument,
final Object value, final String fieldName, final DataType dataType,final
List<String> fieldsToIndex) throws IOException {
+ final DataType chosenDataType = dataType.getFieldType() ==
RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType)
dataType) : dataType;
+ final Object coercedValue = DataTypeUtils.convertType(value,
chosenDataType, fieldName);
+ if (coercedValue == null) {
+ return;
+ }
+
+ switch (chosenDataType.getFieldType()) {
+ case DATE: {
+ final String stringValue =
DataTypeUtils.toString(coercedValue, () ->
DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()));
+ if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
+ LocalDate localDate =
getLocalDateFromEpochTime(fieldName, coercedValue);
+
inputDocument.addField(fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
+ } else {
+
inputDocument.addField(fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
+ }
+ break;
+ }
+ case TIMESTAMP: {
+ final String stringValue =
DataTypeUtils.toString(coercedValue, () ->
DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()));
+ if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
+ LocalDateTime localDateTime =
getLocalDateTimeFromEpochTime(fieldName, coercedValue);
+
inputDocument.addField(fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
+ } else {
+
inputDocument.addField(fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
+ }
+ break;
+ }
+ case DOUBLE:
+
inputDocument.addField(fieldName,DataTypeUtils.toDouble(coercedValue,
fieldName));
+ break;
+ case FLOAT:
+
inputDocument.addField(fieldName,DataTypeUtils.toFloat(coercedValue,
fieldName));
+ break;
+ case LONG:
+
inputDocument.addField(fieldName,DataTypeUtils.toLong(coercedValue, fieldName));
+ break;
+ case INT:
+ case BYTE:
+ case SHORT:
+
inputDocument.addField(fieldName,DataTypeUtils.toInteger(coercedValue,
fieldName));
+ break;
+ case CHAR:
+ case STRING:
+ inputDocument.addField(fieldName,coercedValue.toString());
+ break;
+ case BIGINT:
+ if (coercedValue instanceof Long) {
+ inputDocument.addField(fieldName,(Long) coercedValue);
+ } else {
+ inputDocument.addField(fieldName,(BigInteger)
coercedValue);
+ }
+ break;
+ case BOOLEAN:
+ final String stringValue = coercedValue.toString();
+ if ("true".equalsIgnoreCase(stringValue)) {
+ inputDocument.addField(fieldName,true);
+ } else if ("false".equalsIgnoreCase(stringValue)) {
+ inputDocument.addField(fieldName,false);
+ } else {
+ inputDocument.addField(fieldName,stringValue);
+ }
+ break;
+ case RECORD: {
--- End diff --
I think we have to handle it since someone can specify a field name in
"fields to index" that could be of type record.
I think it makes sense to have a property like "Nested Field Names" with
choices for "Fully Qualified" and "Child Only" (or something like that).
This lines up with how Solr's JSON update works:
https://lucene.apache.org/solr/guide/6_6/transforming-and-indexing-custom-json.html#transforming-and-indexing-custom-json
The part that shows....
The default behavior is to use the fully qualified name (FQN) of the node.
So, if we donât define any field mappings, like this:
curl
'http://localhost:8983/solr/my_collection/update/json/docs?split=/exams'\
-H 'Content-type:application/json' -d '
{
"first": "John",
"last": "Doe",
"grade": 8,
"exams": [
{
"subject": "Maths",
"test" : "term1",
"marks" : 90},
{
"subject": "Biology",
"test" : "term1",
"marks" : 86}
]
}'
The indexed documents would be added to the index with fields that look
like this:
{
"first":"John",
"last":"Doe",
"grade":8,
"exams.subject":"Maths",
"exams.test":"term1",
"exams.marks":90},
{
"first":"John",
"last":"Doe",
"grade":8,
"exams.subject":"Biology",
"exams.test":"term1",
"exams.marks":86}
---