Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2619#discussion_r193480390 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java --- @@ -52,68 +54,125 @@ "The query is limited to the first result (findOne in the Mongo documentation). If no \"Lookup Value Field\" is specified " + "then the entire MongoDB result document minus the _id field will be returned as a record." ) -public class MongoDBLookupService extends MongoDBControllerService implements LookupService<Object> { +public class MongoDBLookupService extends SchemaRegistryService implements LookupService<Object> { + public static final PropertyDescriptor CONTROLLER_SERVICE = new PropertyDescriptor.Builder() + .name("mongo-lookup-client-service") + .displayName("Client Service") + .description("A MongoDB controller service to use with this lookup service.") + .required(true) + .identifiesControllerService(MongoDBClientService.class) + .build(); public static final PropertyDescriptor LOOKUP_VALUE_FIELD = new PropertyDescriptor.Builder() - .name("mongo-lookup-value-field") - .displayName("Lookup Value Field") - .description("The field whose value will be returned when the lookup key(s) match a record. If not specified then the entire " + - "MongoDB result document minus the _id field will be returned as a record.") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .required(false) - .build(); + .name("mongo-lookup-value-field") + .displayName("Lookup Value Field") + .description("The field whose value will be returned when the lookup key(s) match a record. If not specified then the entire " + + "MongoDB result document minus the _id field will be returned as a record.") + .addValidator(Validator.VALID) + .required(false) + .build(); + public static final PropertyDescriptor PROJECTION = new PropertyDescriptor.Builder() + .name("mongo-lookup-projection") + .displayName("Projection") + .description("Specifies a projection for limiting which fields will be returned.") + .required(false) + .build(); private String lookupValueField; - private static final List<PropertyDescriptor> lookupDescriptors; - - static { - lookupDescriptors = new ArrayList<>(); - lookupDescriptors.addAll(descriptors); - lookupDescriptors.add(LOOKUP_VALUE_FIELD); - } - @Override public Optional<Object> lookup(Map<String, Object> coordinates) throws LookupFailureException { - Map<String, Object> clean = new HashMap<>(); - clean.putAll(coordinates); + Map<String, Object> clean = coordinates.entrySet().stream() + .filter(e -> !schemaNameProperty.equals(String.format("${%s}", e.getKey()))) + .collect(Collectors.toMap( + e -> e.getKey(), + e -> e.getValue() + )); Document query = new Document(clean); if (coordinates.size() == 0) { throw new LookupFailureException("No keys were configured. Mongo query would return random documents."); } try { - Document result = this.findOne(query); + Document result = projection != null ? controllerService.findOne(query, projection) : controllerService.findOne(query); if(result == null) { return Optional.empty(); } else if (!StringUtils.isEmpty(lookupValueField)) { return Optional.ofNullable(result.get(lookupValueField)); } else { - final List<RecordField> fields = new ArrayList<>(); + RecordSchema schema = loadSchema(coordinates); - for (String key : result.keySet()) { - if (key.equals("_id")) { - continue; - } - fields.add(new RecordField(key, RecordFieldType.STRING.getDataType())); - } - - final RecordSchema schema = new SimpleRecordSchema(fields); - return Optional.ofNullable(new MapRecord(schema, result)); + RecordSchema toUse = schema != null ? schema : convertSchema(result); --- End diff -- That's probably the right way to do it because we should have it blow up if it can't get the schema on the first pass instead of silently falling back onto the inference option. Once I get that worked out, I'll copy pasta it over the ES one as well.
---