Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2619#discussion_r192802068
  
    --- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java
 ---
    @@ -52,68 +54,125 @@
         "The query is limited to the first result (findOne in the Mongo 
documentation). If no \"Lookup Value Field\" is specified " +
         "then the entire MongoDB result document minus the _id field will be 
returned as a record."
     )
    -public class MongoDBLookupService extends MongoDBControllerService 
implements LookupService<Object> {
    +public class MongoDBLookupService extends SchemaRegistryService implements 
LookupService<Object> {
    +    public static final PropertyDescriptor CONTROLLER_SERVICE = new 
PropertyDescriptor.Builder()
    +        .name("mongo-lookup-client-service")
    +        .displayName("Client Service")
    +        .description("A MongoDB controller service to use with this lookup 
service.")
    +        .required(true)
    +        .identifiesControllerService(MongoDBControllerService.class)
    +        .build();
     
         public static final PropertyDescriptor LOOKUP_VALUE_FIELD = new 
PropertyDescriptor.Builder()
    -            .name("mongo-lookup-value-field")
    -            .displayName("Lookup Value Field")
    -            .description("The field whose value will be returned when the 
lookup key(s) match a record. If not specified then the entire " +
    -                    "MongoDB result document minus the _id field will be 
returned as a record.")
    -            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    -            .required(false)
    -            .build();
    +        .name("mongo-lookup-value-field")
    +        .displayName("Lookup Value Field")
    +        .description("The field whose value will be returned when the 
lookup key(s) match a record. If not specified then the entire " +
    +                "MongoDB result document minus the _id field will be 
returned as a record.")
    +        .addValidator(Validator.VALID)
    +        .required(false)
    +        .build();
    +    public static final PropertyDescriptor PROJECTION = new 
PropertyDescriptor.Builder()
    +        .name("mongo-lookup-projection")
    +        .displayName("Projection")
    +        .description("Specifies a projection for limiting which fields 
will be returned.")
    +        .required(false)
    +        .build();
     
         private String lookupValueField;
     
    -    private static final List<PropertyDescriptor> lookupDescriptors;
    -
    -    static {
    -        lookupDescriptors = new ArrayList<>();
    -        lookupDescriptors.addAll(descriptors);
    -        lookupDescriptors.add(LOOKUP_VALUE_FIELD);
    -    }
    -
         @Override
         public Optional<Object> lookup(Map<String, Object> coordinates) throws 
LookupFailureException {
    -        Map<String, Object> clean = new HashMap<>();
    -        clean.putAll(coordinates);
    +        Map<String, Object> clean = coordinates.entrySet().stream()
    +            .filter(e -> !schemaNameProperty.equals(String.format("${%s}", 
e.getKey())))
    +            .collect(Collectors.toMap(
    +                e -> e.getKey(),
    +                e -> e.getValue()
    +            ));
             Document query = new Document(clean);
     
             if (coordinates.size() == 0) {
                 throw new LookupFailureException("No keys were configured. 
Mongo query would return random documents.");
             }
     
             try {
    -            Document result = this.findOne(query);
    +            Document result = projection != null ? 
controllerService.findOne(query, projection) : controllerService.findOne(query);
     
                 if(result == null) {
                     return Optional.empty();
                 } else if (!StringUtils.isEmpty(lookupValueField)) {
                     return Optional.ofNullable(result.get(lookupValueField));
                 } else {
    -                final List<RecordField> fields = new ArrayList<>();
    +                RecordSchema schema = loadSchema(coordinates);
     
    -                for (String key : result.keySet()) {
    -                    if (key.equals("_id")) {
    -                        continue;
    -                    }
    -                    fields.add(new RecordField(key, 
RecordFieldType.STRING.getDataType()));
    -                }
    -
    -                final RecordSchema schema = new SimpleRecordSchema(fields);
    -                return Optional.ofNullable(new MapRecord(schema, result));
    +                RecordSchema toUse = schema != null ? schema : 
convertSchema(result);
    +                return Optional.ofNullable(new MapRecord(toUse, result));
                 }
             } catch (Exception ex) {
                 getLogger().error("Error during lookup {}", new Object[]{ 
query.toJson() }, ex);
                 throw new LookupFailureException(ex);
             }
         }
     
    -    @Override
    +    private RecordSchema loadSchema(Map<String, Object> coordinates) {
    +        Map<String, String> variables = coordinates.entrySet().stream()
    +            .collect(Collectors.toMap(
    +                e -> e.getKey(),
    +                e -> e.getValue().toString()
    +            ));
    +        try {
    +            return getSchema(variables, null);
    +        } catch (Exception ex) {
    +            return null;
    +        }
    +    }
    +
    +    private RecordSchema convertSchema(Map<String, Object> result) {
    +        List<RecordField> fields = new ArrayList<>();
    +        for (Map.Entry<String, Object> entry : result.entrySet()) {
    +
    +            RecordField field;
    +            if (entry.getValue() instanceof Integer) {
    +                field = new RecordField(entry.getKey(), 
RecordFieldType.INT.getDataType());
    +            } else if (entry.getValue() instanceof Long) {
    +                field = new RecordField(entry.getKey(), 
RecordFieldType.LONG.getDataType());
    +            } else if (entry.getValue() instanceof Boolean) {
    +                field = new RecordField(entry.getKey(), 
RecordFieldType.BOOLEAN.getDataType());
    +            } else if (entry.getValue() instanceof Double) {
    +                field = new RecordField(entry.getKey(), 
RecordFieldType.DOUBLE.getDataType());
    +            } else if (entry.getValue() instanceof Date) {
    +                field = new RecordField(entry.getKey(), 
RecordFieldType.DATE.getDataType());
    +            } else if (entry.getValue() instanceof List) {
    +                field = new RecordField(entry.getKey(), 
RecordFieldType.ARRAY.getDataType());
    +            } else if (entry.getValue() instanceof Map) {
    +                RecordSchema nestedSchema = 
convertSchema((Map)entry.getValue());
    +                RecordDataType rdt = new RecordDataType(nestedSchema);
    +                field = new RecordField(entry.getKey(), rdt);
    +            } else {
    +                field = new RecordField(entry.getKey(), 
RecordFieldType.STRING.getDataType());
    +            }
    +            fields.add(field);
    +        }
    +
    +        return new SimpleRecordSchema(fields);
    +    }
    +
    +    private volatile Document projection;
    +    private MongoDBControllerService controllerService;
    +    private String schemaNameProperty;
    +
         @OnEnabled
    -    public void onEnabled(final ConfigurationContext context) throws 
InitializationException, IOException, InterruptedException {
    +    public void onEnabled(final ConfigurationContext context) {
             this.lookupValueField = 
context.getProperty(LOOKUP_VALUE_FIELD).getValue();
    -        super.onEnabled(context);
    +        this.controllerService = 
context.getProperty(CONTROLLER_SERVICE).asControllerService(MongoDBControllerService.class);
    --- End diff --
    
    Fixed.


---

Reply via email to