Github user MikeThomsen commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2619#discussion_r193480390
--- Diff:
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java
---
@@ -52,68 +54,125 @@
"The query is limited to the first result (findOne in the Mongo
documentation). If no \"Lookup Value Field\" is specified " +
"then the entire MongoDB result document minus the _id field will be
returned as a record."
)
-public class MongoDBLookupService extends MongoDBControllerService
implements LookupService<Object> {
+public class MongoDBLookupService extends SchemaRegistryService implements
LookupService<Object> {
+ public static final PropertyDescriptor CONTROLLER_SERVICE = new
PropertyDescriptor.Builder()
+ .name("mongo-lookup-client-service")
+ .displayName("Client Service")
+ .description("A MongoDB controller service to use with this lookup
service.")
+ .required(true)
+ .identifiesControllerService(MongoDBClientService.class)
+ .build();
public static final PropertyDescriptor LOOKUP_VALUE_FIELD = new
PropertyDescriptor.Builder()
- .name("mongo-lookup-value-field")
- .displayName("Lookup Value Field")
- .description("The field whose value will be returned when the
lookup key(s) match a record. If not specified then the entire " +
- "MongoDB result document minus the _id field will be
returned as a record.")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .required(false)
- .build();
+ .name("mongo-lookup-value-field")
+ .displayName("Lookup Value Field")
+ .description("The field whose value will be returned when the
lookup key(s) match a record. If not specified then the entire " +
+ "MongoDB result document minus the _id field will be
returned as a record.")
+ .addValidator(Validator.VALID)
+ .required(false)
+ .build();
+ public static final PropertyDescriptor PROJECTION = new
PropertyDescriptor.Builder()
+ .name("mongo-lookup-projection")
+ .displayName("Projection")
+ .description("Specifies a projection for limiting which fields
will be returned.")
+ .required(false)
+ .build();
private String lookupValueField;
- private static final List<PropertyDescriptor> lookupDescriptors;
-
- static {
- lookupDescriptors = new ArrayList<>();
- lookupDescriptors.addAll(descriptors);
- lookupDescriptors.add(LOOKUP_VALUE_FIELD);
- }
-
@Override
public Optional<Object> lookup(Map<String, Object> coordinates) throws
LookupFailureException {
- Map<String, Object> clean = new HashMap<>();
- clean.putAll(coordinates);
+ Map<String, Object> clean = coordinates.entrySet().stream()
+ .filter(e -> !schemaNameProperty.equals(String.format("${%s}",
e.getKey())))
+ .collect(Collectors.toMap(
+ e -> e.getKey(),
+ e -> e.getValue()
+ ));
Document query = new Document(clean);
if (coordinates.size() == 0) {
throw new LookupFailureException("No keys were configured.
Mongo query would return random documents.");
}
try {
- Document result = this.findOne(query);
+ Document result = projection != null ?
controllerService.findOne(query, projection) : controllerService.findOne(query);
if(result == null) {
return Optional.empty();
} else if (!StringUtils.isEmpty(lookupValueField)) {
return Optional.ofNullable(result.get(lookupValueField));
} else {
- final List<RecordField> fields = new ArrayList<>();
+ RecordSchema schema = loadSchema(coordinates);
- for (String key : result.keySet()) {
- if (key.equals("_id")) {
- continue;
- }
- fields.add(new RecordField(key,
RecordFieldType.STRING.getDataType()));
- }
-
- final RecordSchema schema = new SimpleRecordSchema(fields);
- return Optional.ofNullable(new MapRecord(schema, result));
+ RecordSchema toUse = schema != null ? schema :
convertSchema(result);
--- End diff --
That's probably the right way to do it because we should have it blow up if
it can't get the schema on the first pass instead of silently falling back onto
the inference option. Once I get that worked out, I'll copy pasta it over the
ES one as well.
---