ChrisSamo632 commented on a change in pull request #4691:
URL: https://github.com/apache/nifi/pull/4691#discussion_r532728519
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
##########
@@ -376,13 +483,92 @@ private String getFromRecordPath(Record record,
RecordPath path, final String fa
);
}
- fieldValue.updateValue(null);
+ if (!retain) {
+ fieldValue.updateValue(null);
+ }
+
+ return fieldValue.getValue().toString();
+ } else {
+ return fallback;
+ }
+ }
+
+ private Object getTimestampFromRecordPath(final Record record, final
RecordPath path, final String fallback,
+ final boolean retain) {
+ if (path == null) {
+ return fallback;
+ }
+
+ final RecordPathResult result = path.evaluate(record);
+ final Optional<FieldValue> value =
result.getSelectedFields().findFirst();
+ if (value.isPresent() && value.get().getValue() != null) {
+ final FieldValue fieldValue = value.get();
+
+ final DataType dataType = fieldValue.getField().getDataType();
+ final String fieldName = fieldValue.getField().getFieldName();
+ final DataType chosenDataType = dataType.getFieldType() ==
RecordFieldType.CHOICE
+ ? DataTypeUtils.chooseDataType(value, (ChoiceDataType)
dataType)
+ : dataType;
+ final Object coercedValue =
DataTypeUtils.convertType(fieldValue.getValue(), chosenDataType, fieldName);
+ if (coercedValue == null) {
+ return null;
+ }
+
+ final Object returnValue;
+ switch (chosenDataType.getFieldType()) {
+ case DATE:
+ case TIME:
+ case TIMESTAMP:
+ final String format;
+ switch (chosenDataType.getFieldType()) {
+ case DATE:
+ format = this.dateFormat;
+ break;
+ case TIME:
+ format = this.timeFormat;
+ break;
+ default:
+ format = this.timestampFormat;
+ }
+ returnValue = coerceStringToLong(
+ fieldName,
+ DataTypeUtils.toString(coercedValue, () ->
DataTypeUtils.getDateFormat(format))
+ );
+ break;
+ case LONG:
+ returnValue = DataTypeUtils.toLong(coercedValue,
fieldName);
+ break;
+ case INT:
+ case BYTE:
+ case SHORT:
+ returnValue = DataTypeUtils.toInteger(coercedValue,
fieldName);
+ break;
+ case CHAR:
+ case STRING:
+ returnValue = coerceStringToLong(fieldName,
coercedValue.toString());
+ break;
+ case BIGINT:
+ returnValue = coercedValue;
+ break;
+ default:
+ throw new ProcessException(
+ String.format("Cannot use %s field referenced by
%s as @timestamp.", chosenDataType.toString(), path.getPath())
+ );
+ }
- String retVal = fieldValue.getValue().toString();
+ if (!retain) {
+ fieldValue.updateValue(null);
+ }
- return retVal;
+ return returnValue;
} else {
return fallback;
Review comment:
I decided that doing the coercion was probably the sensible thing to be
consistent throughout the PutElasticsearchRecord processor and also because if
I specify an epoch timestamp as a FlowFile attribute, I'd want it to be output
as a Long rather than a String (although Elasticsearch is capable of doing this
coercion itself if we didn't do this in NiFi)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]