Github user MikeThomsen commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2619#discussion_r192802068
--- Diff:
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java
---
@@ -52,68 +54,125 @@
"The query is limited to the first result (findOne in the Mongo
documentation). If no \"Lookup Value Field\" is specified " +
"then the entire MongoDB result document minus the _id field will be
returned as a record."
)
-public class MongoDBLookupService extends MongoDBControllerService
implements LookupService<Object> {
+public class MongoDBLookupService extends SchemaRegistryService implements
LookupService<Object> {
+ public static final PropertyDescriptor CONTROLLER_SERVICE = new
PropertyDescriptor.Builder()
+ .name("mongo-lookup-client-service")
+ .displayName("Client Service")
+ .description("A MongoDB controller service to use with this lookup
service.")
+ .required(true)
+ .identifiesControllerService(MongoDBControllerService.class)
+ .build();
public static final PropertyDescriptor LOOKUP_VALUE_FIELD = new
PropertyDescriptor.Builder()
- .name("mongo-lookup-value-field")
- .displayName("Lookup Value Field")
- .description("The field whose value will be returned when the
lookup key(s) match a record. If not specified then the entire " +
- "MongoDB result document minus the _id field will be
returned as a record.")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .required(false)
- .build();
+ .name("mongo-lookup-value-field")
+ .displayName("Lookup Value Field")
+ .description("The field whose value will be returned when the
lookup key(s) match a record. If not specified then the entire " +
+ "MongoDB result document minus the _id field will be
returned as a record.")
+ .addValidator(Validator.VALID)
+ .required(false)
+ .build();
+ public static final PropertyDescriptor PROJECTION = new
PropertyDescriptor.Builder()
+ .name("mongo-lookup-projection")
+ .displayName("Projection")
+ .description("Specifies a projection for limiting which fields
will be returned.")
+ .required(false)
+ .build();
private String lookupValueField;
- private static final List<PropertyDescriptor> lookupDescriptors;
-
- static {
- lookupDescriptors = new ArrayList<>();
- lookupDescriptors.addAll(descriptors);
- lookupDescriptors.add(LOOKUP_VALUE_FIELD);
- }
-
@Override
public Optional<Object> lookup(Map<String, Object> coordinates) throws
LookupFailureException {
- Map<String, Object> clean = new HashMap<>();
- clean.putAll(coordinates);
+ Map<String, Object> clean = coordinates.entrySet().stream()
+ .filter(e -> !schemaNameProperty.equals(String.format("${%s}",
e.getKey())))
+ .collect(Collectors.toMap(
+ e -> e.getKey(),
+ e -> e.getValue()
+ ));
Document query = new Document(clean);
if (coordinates.size() == 0) {
throw new LookupFailureException("No keys were configured.
Mongo query would return random documents.");
}
try {
- Document result = this.findOne(query);
+ Document result = projection != null ?
controllerService.findOne(query, projection) : controllerService.findOne(query);
if(result == null) {
return Optional.empty();
} else if (!StringUtils.isEmpty(lookupValueField)) {
return Optional.ofNullable(result.get(lookupValueField));
} else {
- final List<RecordField> fields = new ArrayList<>();
+ RecordSchema schema = loadSchema(coordinates);
- for (String key : result.keySet()) {
- if (key.equals("_id")) {
- continue;
- }
- fields.add(new RecordField(key,
RecordFieldType.STRING.getDataType()));
- }
-
- final RecordSchema schema = new SimpleRecordSchema(fields);
- return Optional.ofNullable(new MapRecord(schema, result));
+ RecordSchema toUse = schema != null ? schema :
convertSchema(result);
+ return Optional.ofNullable(new MapRecord(toUse, result));
}
} catch (Exception ex) {
getLogger().error("Error during lookup {}", new Object[]{
query.toJson() }, ex);
throw new LookupFailureException(ex);
}
}
- @Override
+ private RecordSchema loadSchema(Map<String, Object> coordinates) {
+ Map<String, String> variables = coordinates.entrySet().stream()
+ .collect(Collectors.toMap(
+ e -> e.getKey(),
+ e -> e.getValue().toString()
+ ));
+ try {
+ return getSchema(variables, null);
+ } catch (Exception ex) {
+ return null;
+ }
+ }
+
+ private RecordSchema convertSchema(Map<String, Object> result) {
+ List<RecordField> fields = new ArrayList<>();
+ for (Map.Entry<String, Object> entry : result.entrySet()) {
+
+ RecordField field;
+ if (entry.getValue() instanceof Integer) {
+ field = new RecordField(entry.getKey(),
RecordFieldType.INT.getDataType());
+ } else if (entry.getValue() instanceof Long) {
+ field = new RecordField(entry.getKey(),
RecordFieldType.LONG.getDataType());
+ } else if (entry.getValue() instanceof Boolean) {
+ field = new RecordField(entry.getKey(),
RecordFieldType.BOOLEAN.getDataType());
+ } else if (entry.getValue() instanceof Double) {
+ field = new RecordField(entry.getKey(),
RecordFieldType.DOUBLE.getDataType());
+ } else if (entry.getValue() instanceof Date) {
+ field = new RecordField(entry.getKey(),
RecordFieldType.DATE.getDataType());
+ } else if (entry.getValue() instanceof List) {
+ field = new RecordField(entry.getKey(),
RecordFieldType.ARRAY.getDataType());
+ } else if (entry.getValue() instanceof Map) {
+ RecordSchema nestedSchema =
convertSchema((Map)entry.getValue());
+ RecordDataType rdt = new RecordDataType(nestedSchema);
+ field = new RecordField(entry.getKey(), rdt);
+ } else {
+ field = new RecordField(entry.getKey(),
RecordFieldType.STRING.getDataType());
+ }
+ fields.add(field);
+ }
+
+ return new SimpleRecordSchema(fields);
+ }
+
+ private volatile Document projection;
+ private MongoDBControllerService controllerService;
+ private String schemaNameProperty;
+
@OnEnabled
- public void onEnabled(final ConfigurationContext context) throws
InitializationException, IOException, InterruptedException {
+ public void onEnabled(final ConfigurationContext context) {
this.lookupValueField =
context.getProperty(LOOKUP_VALUE_FIELD).getValue();
- super.onEnabled(context);
+ this.controllerService =
context.getProperty(CONTROLLER_SERVICE).asControllerService(MongoDBControllerService.class);
--- End diff --
Fixed.
---