Repository: nifi Updated Branches: refs/heads/master 20a6374bf -> bfd6c0aef
NIFI-4116: Allow fields of Record returned from Lookup Service to be placed into record in the input, instead of requiring that the 'wrapper record' returned from Lookup be included Signed-off-by: James Wing <[email protected]> This closes #2110. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bfd6c0ae Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bfd6c0ae Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bfd6c0ae Branch: refs/heads/master Commit: bfd6c0aef768f53be8fdea62afdcd5404099e089 Parents: 20a6374 Author: Mark Payne <[email protected]> Authored: Fri Aug 25 16:22:43 2017 -0400 Committer: James Wing <[email protected]> Committed: Tue Sep 5 21:29:16 2017 -0700 ---------------------------------------------------------------------- .../nifi/processors/standard/LookupRecord.java | 56 +++++- .../processors/standard/TestLookupRecord.java | 173 +++++++++++++++++++ 2 files changed, 224 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/bfd6c0ae/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java index 10539bc..286f7ee 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java @@ -95,6 +95,11 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa "Records will be routed to either a 'matched' or an 'unmatched' Relationship depending on whether or not there was a match in the configured Lookup Service. " + "A single input FlowFile may result in two different output FlowFiles."); + static final AllowableValue RESULT_ENTIRE_RECORD = new AllowableValue("insert-entire-record", "Insert Entire Record", + "The entire Record that is retrieved from the Lookup Service will be inserted into the destination path."); + static final AllowableValue RESULT_RECORD_FIELDS = new AllowableValue("record-fields", "Insert Record Fields", + "All of the fields in the Record that is retrieved from the Lookup Service will be inserted into the destination path."); + static final PropertyDescriptor LOOKUP_SERVICE = new PropertyDescriptor.Builder() .name("lookup-service") .displayName("Lookup Service") @@ -114,6 +119,16 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa .required(false) .build(); + static final PropertyDescriptor RESULT_CONTENTS = new PropertyDescriptor.Builder() + .name("result-contents") + .displayName("Record Result Contents") + .description("When a result is obtained that contains a Record, this property determines whether the Record itself is inserted at the configured " + + "path or if the contents of the Record (i.e., the sub-fields) will be inserted at the configured path.") + .allowableValues(RESULT_ENTIRE_RECORD, RESULT_RECORD_FIELDS) + .defaultValue(RESULT_ENTIRE_RECORD.getValue()) + .required(true) + .build(); + static final PropertyDescriptor ROUTING_STRATEGY = new PropertyDescriptor.Builder() .name("routing-strategy") .displayName("Routing Strategy") @@ -161,6 +176,7 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa properties.add(LOOKUP_SERVICE); properties.add(RESULT_RECORD_PATH); properties.add(ROUTING_STRATEGY); + properties.add(RESULT_CONTENTS); return properties; } @@ -272,14 +288,14 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa lookupCoordinates.put(coordinateKey, coordinateValue); } - final Optional<?> lookupValue; + final Optional<?> lookupValueOption; try { - lookupValue = lookupService.lookup(lookupCoordinates); + lookupValueOption = lookupService.lookup(lookupCoordinates); } catch (final Exception e) { throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e); } - if (!lookupValue.isPresent()) { + if (!lookupValueOption.isPresent()) { final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION; return rels; } @@ -289,9 +305,39 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa if (resultPath != null) { record.incorporateSchema(writeSchema); - final Object replacementValue = lookupValue.get(); + final Object lookupValue = lookupValueOption.get(); final RecordPathResult resultPathResult = flowFileContext.getValue().evaluate(record); - resultPathResult.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(replacementValue)); + + final String resultContentsValue = context.getProperty(RESULT_CONTENTS).getValue(); + if (RESULT_RECORD_FIELDS.getValue().equals(resultContentsValue) && lookupValue instanceof Record) { + final Record lookupRecord = (Record) lookupValue; + + // Use wants to add all fields of the resultant Record to the specified Record Path. + // If the destination Record Path returns to us a Record, then we will add all field values of + // the Lookup Record to the destination Record. However, if the destination Record Path returns + // something other than a Record, then we can't add the fields to it. We can only replace it, + // because it doesn't make sense to add fields to anything but a Record. + resultPathResult.getSelectedFields().forEach(fieldVal -> { + final Object destinationValue = fieldVal.getValue(); + + if (destinationValue instanceof Record) { + final Record destinationRecord = (Record) destinationValue; + + for (final String fieldName : lookupRecord.getRawFieldNames()) { + final Object value = lookupRecord.getValue(fieldName); + destinationRecord.setValue(fieldName, value); + } + } else { + final Optional<Record> parentOption = fieldVal.getParentRecord(); + + if (parentOption.isPresent()) { + parentOption.get().setValue(fieldVal.getField().getFieldName(), lookupRecord); + } + } + }); + } else { + resultPathResult.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(lookupValue)); + } } final Set<Relationship> rels = routeToMatchedUnmatched ? MATCHED_COLLECTION : SUCCESS_COLLECTION; http://git-wip-us.apache.org/repos/asf/nifi/blob/bfd6c0ae/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java index b84f518..29966e7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java @@ -17,19 +17,30 @@ package org.apache.nifi.processors.standard; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.lookup.RecordLookupService; import org.apache.nifi.lookup.StringLookupService; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.MockRecordParser; import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -227,6 +238,137 @@ public class TestLookupRecord { } + @Test + public void testAddFieldsToExistingRecord() throws InitializationException, IOException { + final RecordLookup lookupService = new RecordLookup(); + runner.addControllerService("lookup", lookupService); + runner.enableControllerService(lookupService); + + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("least", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + final Record sports = new MapRecord(schema, new HashMap<String, Object>()); + + sports.setValue("favorite", "basketball"); + sports.setValue("least", "soccer"); + + lookupService.addValue("John Doe", sports); + + recordReader = new MockRecordParser(); + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("favorite", RecordFieldType.STRING); + recordReader.addSchemaField("least", RecordFieldType.STRING); + + recordReader.addRecord("John Doe", 48, null, "baseball"); + + runner.addControllerService("reader", recordReader); + runner.enableControllerService(recordReader); + + runner.setProperty("lookup", "/name"); + runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/"); + runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS); + + runner.enqueue(""); + runner.run(); + + final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0); + out.assertContentEquals("John Doe,48,basketball,soccer\n"); + } + + /** + * If the output fields are added to a record that doesn't exist, the result should be that a Record is + * created and the results added to it. + */ + @Test + public void testAddFieldsToNonExistentRecord() throws InitializationException { + final RecordLookup lookupService = new RecordLookup(); + runner.addControllerService("lookup", lookupService); + runner.enableControllerService(lookupService); + + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("least", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + final Record sports = new MapRecord(schema, new HashMap<String, Object>()); + + sports.setValue("favorite", "basketball"); + sports.setValue("least", "soccer"); + + lookupService.addValue("John Doe", sports); + + recordReader = new MockRecordParser(); + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("sport", RecordFieldType.RECORD); + + recordReader.addRecord("John Doe", 48, null); + + runner.addControllerService("reader", recordReader); + runner.enableControllerService(recordReader); + + runner.setProperty("lookup", "/name"); + runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/sport"); + runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS); + + runner.enqueue(""); + runner.run(); + + final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0); + + // We can't be sure of the order of the fields in the record, so we allow either 'least' or 'favorite' to be first + final String outputContents = new String(out.toByteArray()); + assertTrue(outputContents.equals("John Doe,48,MapRecord[{favorite=basketball, least=soccer}]\n") + || outputContents.equals("John Doe,48,MapRecord[{least=soccer, favorite=basketball}]\n")); + } + + /** + * If the output fields are added to a non-record field, then the result should be that the field + * becomes a UNION that does allow the Record and the value is set to a Record. + */ + @Test + public void testAddFieldsToNonRecordField() throws InitializationException { + final RecordLookup lookupService = new RecordLookup(); + runner.addControllerService("lookup", lookupService); + runner.enableControllerService(lookupService); + + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("least", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + final Record sports = new MapRecord(schema, new HashMap<String, Object>()); + + sports.setValue("favorite", "basketball"); + sports.setValue("least", "soccer"); + + lookupService.addValue("John Doe", sports); + + recordReader = new MockRecordParser(); + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("sport", RecordFieldType.STRING); + + recordReader.addRecord("John Doe", 48, null); + + runner.addControllerService("reader", recordReader); + runner.enableControllerService(recordReader); + + runner.setProperty("lookup", "/name"); + runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/sport"); + runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS); + + runner.enqueue(""); + runner.run(); + + final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0); + + // We can't be sure of the order of the fields in the record, so we allow either 'least' or 'favorite' to be first + final String outputContents = new String(out.toByteArray()); + assertTrue(outputContents.equals("John Doe,48,MapRecord[{favorite=basketball, least=soccer}]\n") + || outputContents.equals("John Doe,48,MapRecord[{least=soccer, favorite=basketball}]\n")); + } + private static class MapLookup extends AbstractControllerService implements StringLookupService { private final Map<String, String> values = new HashMap<>(); @@ -260,4 +402,35 @@ public class TestLookupRecord { } } + private static class RecordLookup extends AbstractControllerService implements RecordLookupService { + private final Map<String, Record> values = new HashMap<>(); + + public void addValue(final String key, final Record value) { + values.put(key, value); + } + + @Override + public Class<?> getValueType() { + return String.class; + } + + @Override + public Optional<Record> lookup(final Map<String, String> coordinates) { + if (coordinates == null) { + return Optional.empty(); + } + + final String key = coordinates.get("lookup"); + if (key == null) { + return Optional.empty(); + } + + return Optional.ofNullable(values.get(key)); + } + + @Override + public Set<String> getRequiredKeys() { + return Collections.singleton("lookup"); + } + } }
