Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2619#discussion_r193480390
  
    --- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java
 ---
    @@ -52,68 +54,125 @@
         "The query is limited to the first result (findOne in the Mongo 
documentation). If no \"Lookup Value Field\" is specified " +
         "then the entire MongoDB result document minus the _id field will be 
returned as a record."
     )
    -public class MongoDBLookupService extends MongoDBControllerService 
implements LookupService<Object> {
    +public class MongoDBLookupService extends SchemaRegistryService implements 
LookupService<Object> {
    +    public static final PropertyDescriptor CONTROLLER_SERVICE = new 
PropertyDescriptor.Builder()
    +        .name("mongo-lookup-client-service")
    +        .displayName("Client Service")
    +        .description("A MongoDB controller service to use with this lookup 
service.")
    +        .required(true)
    +        .identifiesControllerService(MongoDBClientService.class)
    +        .build();
     
         public static final PropertyDescriptor LOOKUP_VALUE_FIELD = new 
PropertyDescriptor.Builder()
    -            .name("mongo-lookup-value-field")
    -            .displayName("Lookup Value Field")
    -            .description("The field whose value will be returned when the 
lookup key(s) match a record. If not specified then the entire " +
    -                    "MongoDB result document minus the _id field will be 
returned as a record.")
    -            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    -            .required(false)
    -            .build();
    +        .name("mongo-lookup-value-field")
    +        .displayName("Lookup Value Field")
    +        .description("The field whose value will be returned when the 
lookup key(s) match a record. If not specified then the entire " +
    +                "MongoDB result document minus the _id field will be 
returned as a record.")
    +        .addValidator(Validator.VALID)
    +        .required(false)
    +        .build();
    +    public static final PropertyDescriptor PROJECTION = new 
PropertyDescriptor.Builder()
    +        .name("mongo-lookup-projection")
    +        .displayName("Projection")
    +        .description("Specifies a projection for limiting which fields 
will be returned.")
    +        .required(false)
    +        .build();
     
         private String lookupValueField;
     
    -    private static final List<PropertyDescriptor> lookupDescriptors;
    -
    -    static {
    -        lookupDescriptors = new ArrayList<>();
    -        lookupDescriptors.addAll(descriptors);
    -        lookupDescriptors.add(LOOKUP_VALUE_FIELD);
    -    }
    -
         @Override
         public Optional<Object> lookup(Map<String, Object> coordinates) throws 
LookupFailureException {
    -        Map<String, Object> clean = new HashMap<>();
    -        clean.putAll(coordinates);
    +        Map<String, Object> clean = coordinates.entrySet().stream()
    +            .filter(e -> !schemaNameProperty.equals(String.format("${%s}", 
e.getKey())))
    +            .collect(Collectors.toMap(
    +                e -> e.getKey(),
    +                e -> e.getValue()
    +            ));
             Document query = new Document(clean);
     
             if (coordinates.size() == 0) {
                 throw new LookupFailureException("No keys were configured. 
Mongo query would return random documents.");
             }
     
             try {
    -            Document result = this.findOne(query);
    +            Document result = projection != null ? 
controllerService.findOne(query, projection) : controllerService.findOne(query);
     
                 if(result == null) {
                     return Optional.empty();
                 } else if (!StringUtils.isEmpty(lookupValueField)) {
                     return Optional.ofNullable(result.get(lookupValueField));
                 } else {
    -                final List<RecordField> fields = new ArrayList<>();
    +                RecordSchema schema = loadSchema(coordinates);
     
    -                for (String key : result.keySet()) {
    -                    if (key.equals("_id")) {
    -                        continue;
    -                    }
    -                    fields.add(new RecordField(key, 
RecordFieldType.STRING.getDataType()));
    -                }
    -
    -                final RecordSchema schema = new SimpleRecordSchema(fields);
    -                return Optional.ofNullable(new MapRecord(schema, result));
    +                RecordSchema toUse = schema != null ? schema : 
convertSchema(result);
    --- End diff --
    
    That's probably the right way to do it because we should have it blow up if 
it can't get the schema on the first pass instead of silently falling back onto 
the inference option. Once I get that worked out, I'll copy pasta it over the 
ES one as well.


---

Reply via email to