Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2619#discussion_r192841468 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java --- @@ -52,68 +54,125 @@ "The query is limited to the first result (findOne in the Mongo documentation). If no \"Lookup Value Field\" is specified " + "then the entire MongoDB result document minus the _id field will be returned as a record." ) -public class MongoDBLookupService extends MongoDBControllerService implements LookupService<Object> { +public class MongoDBLookupService extends SchemaRegistryService implements LookupService<Object> { + public static final PropertyDescriptor CONTROLLER_SERVICE = new PropertyDescriptor.Builder() + .name("mongo-lookup-client-service") + .displayName("Client Service") + .description("A MongoDB controller service to use with this lookup service.") + .required(true) + .identifiesControllerService(MongoDBControllerService.class) + .build(); public static final PropertyDescriptor LOOKUP_VALUE_FIELD = new PropertyDescriptor.Builder() - .name("mongo-lookup-value-field") - .displayName("Lookup Value Field") - .description("The field whose value will be returned when the lookup key(s) match a record. If not specified then the entire " + - "MongoDB result document minus the _id field will be returned as a record.") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .required(false) - .build(); + .name("mongo-lookup-value-field") + .displayName("Lookup Value Field") + .description("The field whose value will be returned when the lookup key(s) match a record. If not specified then the entire " + + "MongoDB result document minus the _id field will be returned as a record.") + .addValidator(Validator.VALID) + .required(false) + .build(); + public static final PropertyDescriptor PROJECTION = new PropertyDescriptor.Builder() + .name("mongo-lookup-projection") + .displayName("Projection") + .description("Specifies a projection for limiting which fields will be returned.") + .required(false) + .build(); private String lookupValueField; - private static final List<PropertyDescriptor> lookupDescriptors; - - static { - lookupDescriptors = new ArrayList<>(); - lookupDescriptors.addAll(descriptors); - lookupDescriptors.add(LOOKUP_VALUE_FIELD); - } - @Override public Optional<Object> lookup(Map<String, Object> coordinates) throws LookupFailureException { - Map<String, Object> clean = new HashMap<>(); - clean.putAll(coordinates); + Map<String, Object> clean = coordinates.entrySet().stream() + .filter(e -> !schemaNameProperty.equals(String.format("${%s}", e.getKey()))) + .collect(Collectors.toMap( + e -> e.getKey(), + e -> e.getValue() + )); Document query = new Document(clean); if (coordinates.size() == 0) { throw new LookupFailureException("No keys were configured. Mongo query would return random documents."); } try { - Document result = this.findOne(query); + Document result = projection != null ? controllerService.findOne(query, projection) : controllerService.findOne(query); if(result == null) { return Optional.empty(); } else if (!StringUtils.isEmpty(lookupValueField)) { return Optional.ofNullable(result.get(lookupValueField)); } else { - final List<RecordField> fields = new ArrayList<>(); + RecordSchema schema = loadSchema(coordinates); - for (String key : result.keySet()) { - if (key.equals("_id")) { - continue; - } - fields.add(new RecordField(key, RecordFieldType.STRING.getDataType())); - } - - final RecordSchema schema = new SimpleRecordSchema(fields); - return Optional.ofNullable(new MapRecord(schema, result)); + RecordSchema toUse = schema != null ? schema : convertSchema(result); + return Optional.ofNullable(new MapRecord(toUse, result)); } } catch (Exception ex) { getLogger().error("Error during lookup {}", new Object[]{ query.toJson() }, ex); throw new LookupFailureException(ex); } } - @Override + private RecordSchema loadSchema(Map<String, Object> coordinates) { + Map<String, String> variables = coordinates.entrySet().stream() + .collect(Collectors.toMap( + e -> e.getKey(), + e -> e.getValue().toString() + )); + try { + return getSchema(variables, null); + } catch (Exception ex) { + return null; + } + } + + private RecordSchema convertSchema(Map<String, Object> result) { + List<RecordField> fields = new ArrayList<>(); + for (Map.Entry<String, Object> entry : result.entrySet()) { + + RecordField field; + if (entry.getValue() instanceof Integer) { + field = new RecordField(entry.getKey(), RecordFieldType.INT.getDataType()); + } else if (entry.getValue() instanceof Long) { + field = new RecordField(entry.getKey(), RecordFieldType.LONG.getDataType()); + } else if (entry.getValue() instanceof Boolean) { + field = new RecordField(entry.getKey(), RecordFieldType.BOOLEAN.getDataType()); + } else if (entry.getValue() instanceof Double) { + field = new RecordField(entry.getKey(), RecordFieldType.DOUBLE.getDataType()); + } else if (entry.getValue() instanceof Date) { + field = new RecordField(entry.getKey(), RecordFieldType.DATE.getDataType()); + } else if (entry.getValue() instanceof List) { + field = new RecordField(entry.getKey(), RecordFieldType.ARRAY.getDataType()); + } else if (entry.getValue() instanceof Map) { + RecordSchema nestedSchema = convertSchema((Map)entry.getValue()); + RecordDataType rdt = new RecordDataType(nestedSchema); + field = new RecordField(entry.getKey(), rdt); + } else { + field = new RecordField(entry.getKey(), RecordFieldType.STRING.getDataType()); + } + fields.add(field); + } + + return new SimpleRecordSchema(fields); + } + + private volatile Document projection; + private MongoDBControllerService controllerService; + private String schemaNameProperty; + @OnEnabled - public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException { + public void onEnabled(final ConfigurationContext context) { this.lookupValueField = context.getProperty(LOOKUP_VALUE_FIELD).getValue(); - super.onEnabled(context); + this.controllerService = context.getProperty(CONTROLLER_SERVICE).asControllerService(MongoDBControllerService.class); --- End diff -- Looks like this one is still there, still getting the runtime errors
---