Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2619#discussion_r193406221
  
    --- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java
 ---
    @@ -52,68 +54,125 @@
         "The query is limited to the first result (findOne in the Mongo 
documentation). If no \"Lookup Value Field\" is specified " +
         "then the entire MongoDB result document minus the _id field will be 
returned as a record."
     )
    -public class MongoDBLookupService extends MongoDBControllerService 
implements LookupService<Object> {
    +public class MongoDBLookupService extends SchemaRegistryService implements 
LookupService<Object> {
    +    public static final PropertyDescriptor CONTROLLER_SERVICE = new 
PropertyDescriptor.Builder()
    +        .name("mongo-lookup-client-service")
    +        .displayName("Client Service")
    +        .description("A MongoDB controller service to use with this lookup 
service.")
    +        .required(true)
    +        .identifiesControllerService(MongoDBClientService.class)
    +        .build();
     
         public static final PropertyDescriptor LOOKUP_VALUE_FIELD = new 
PropertyDescriptor.Builder()
    -            .name("mongo-lookup-value-field")
    -            .displayName("Lookup Value Field")
    -            .description("The field whose value will be returned when the 
lookup key(s) match a record. If not specified then the entire " +
    -                    "MongoDB result document minus the _id field will be 
returned as a record.")
    -            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    -            .required(false)
    -            .build();
    +        .name("mongo-lookup-value-field")
    +        .displayName("Lookup Value Field")
    +        .description("The field whose value will be returned when the 
lookup key(s) match a record. If not specified then the entire " +
    +                "MongoDB result document minus the _id field will be 
returned as a record.")
    +        .addValidator(Validator.VALID)
    +        .required(false)
    +        .build();
    +    public static final PropertyDescriptor PROJECTION = new 
PropertyDescriptor.Builder()
    +        .name("mongo-lookup-projection")
    +        .displayName("Projection")
    +        .description("Specifies a projection for limiting which fields 
will be returned.")
    +        .required(false)
    +        .build();
     
         private String lookupValueField;
     
    -    private static final List<PropertyDescriptor> lookupDescriptors;
    -
    -    static {
    -        lookupDescriptors = new ArrayList<>();
    -        lookupDescriptors.addAll(descriptors);
    -        lookupDescriptors.add(LOOKUP_VALUE_FIELD);
    -    }
    -
         @Override
         public Optional<Object> lookup(Map<String, Object> coordinates) throws 
LookupFailureException {
    -        Map<String, Object> clean = new HashMap<>();
    -        clean.putAll(coordinates);
    +        Map<String, Object> clean = coordinates.entrySet().stream()
    +            .filter(e -> !schemaNameProperty.equals(String.format("${%s}", 
e.getKey())))
    +            .collect(Collectors.toMap(
    +                e -> e.getKey(),
    +                e -> e.getValue()
    +            ));
             Document query = new Document(clean);
     
             if (coordinates.size() == 0) {
                 throw new LookupFailureException("No keys were configured. 
Mongo query would return random documents.");
             }
     
             try {
    -            Document result = this.findOne(query);
    +            Document result = projection != null ? 
controllerService.findOne(query, projection) : controllerService.findOne(query);
     
                 if(result == null) {
                     return Optional.empty();
                 } else if (!StringUtils.isEmpty(lookupValueField)) {
                     return Optional.ofNullable(result.get(lookupValueField));
                 } else {
    -                final List<RecordField> fields = new ArrayList<>();
    +                RecordSchema schema = loadSchema(coordinates);
     
    -                for (String key : result.keySet()) {
    -                    if (key.equals("_id")) {
    -                        continue;
    -                    }
    -                    fields.add(new RecordField(key, 
RecordFieldType.STRING.getDataType()));
    -                }
    -
    -                final RecordSchema schema = new SimpleRecordSchema(fields);
    -                return Optional.ofNullable(new MapRecord(schema, result));
    +                RecordSchema toUse = schema != null ? schema : 
convertSchema(result);
    --- End diff --
    
    One thing you could do is to override getSupportedPropertyDescriptors() and 
add your own property for Schema Access Strategy that only has the relevant 
ones, including your own strategy of "Infer Schema From Document" or something.


---

Reply via email to