This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.11.x in repository https://gitbox.apache.org/repos/asf/nifi.git
commit c82c1db628c4f8191fb965f1ada821c1bb3fbc55 Author: Pierre Villard <[email protected]> AuthorDate: Tue Feb 25 08:53:36 2020 -0800 NIFI-7197 - In-place replacement in LookupRecord processor This closes #4088 Signed-off-by: Mark Payne <[email protected]> --- .../nifi-standard-processors/pom.xml | 2 + .../nifi/processors/standard/LookupRecord.java | 126 ++++++++++-- .../additionalDetails.html | 215 +++++++++++++++++++++ .../nifi/processors/standard/TestLookupRecord.java | 89 +++++++++ .../TestLookupRecord/lookup-array-input.json | 29 +++ .../TestLookupRecord/lookup-array-output.json | 1 + 6 files changed, 446 insertions(+), 16 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 2cd98d6..896c85f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -581,6 +581,8 @@ <exclude>src/test/resources/TestValidateRecord/nested-map-schema.avsc</exclude> <exclude>src/test/resources/TestValidateRecord/timestamp.avsc</exclude> <exclude>src/test/resources/TestValidateRecord/timestamp.json</exclude> + <exclude>src/test/resources/TestLookupRecord/lookup-array-input.json</exclude> + <exclude>src/test/resources/TestLookupRecord/lookup-array-output.json</exclude> </excludes> </configuration> </plugin> diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java index 23d1325..28705cc 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java @@ -105,6 +105,14 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa static final AllowableValue RESULT_RECORD_FIELDS = new AllowableValue("record-fields", "Insert Record Fields", "All of the fields in the Record that is retrieved from the Lookup Service will be inserted into the destination path."); + static final AllowableValue USE_PROPERTY = new AllowableValue("use-property", "Use Property", + "The \"Result RecordPath\" property will be used to determine which part of the record should be updated with the value returned by the Lookup Service"); + static final AllowableValue REPLACE_EXISTING_VALUES = new AllowableValue("replace-existing-values", "Replace Existing Values", + "The \"Result RecordPath\" property will be ignored and the lookup service must be a single simple key lookup service. Every dynamic property value should " + + "be a record path. For each dynamic property, the value contained in the field corresponding to the record path will be used as the key in the Lookup " + + "Service and the value returned by the Lookup Service will be used to replace the existing value. It is possible to configure multiple dynamic properties " + + "to replace multiple values in one execution. This strategy only supports simple types replacements (strings, integers, etc)."); + static final PropertyDescriptor LOOKUP_SERVICE = new PropertyDescriptor.Builder() .name("lookup-service") .displayName("Lookup Service") @@ -144,6 +152,16 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa .required(true) .build(); + static final PropertyDescriptor REPLACEMENT_STRATEGY = new PropertyDescriptor.Builder() + .name("record-update-strategy") + .displayName("Record Update Strategy") + .description("This property defines the strategy to use when updating the record with the value returned by the Lookup Service.") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .allowableValues(REPLACE_EXISTING_VALUES, USE_PROPERTY) + .defaultValue(USE_PROPERTY.getValue()) + .required(true) + .build(); + static final Relationship REL_MATCHED = new Relationship.Builder() .name("matched") .description("All records for which the lookup returns a value will be routed to this relationship") @@ -182,6 +200,7 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa properties.add(RESULT_RECORD_PATH); properties.add(ROUTING_STRATEGY); properties.add(RESULT_CONTENTS); + properties.add(REPLACEMENT_STRATEGY); return properties; } @@ -214,24 +233,37 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa } final Set<String> requiredKeys = validationContext.getProperty(LOOKUP_SERVICE).asControllerService(LookupService.class).getRequiredKeys(); - final Set<String> missingKeys = requiredKeys.stream() - .filter(key -> !dynamicPropNames.contains(key)) - .collect(Collectors.toSet()); - if (!missingKeys.isEmpty()) { - final List<ValidationResult> validationResults = new ArrayList<>(); - for (final String missingKey : missingKeys) { - final ValidationResult result = new ValidationResult.Builder() - .subject(missingKey) - .valid(false) - .explanation("The configured Lookup Services requires that a key be provided with the name '" + missingKey - + "'. Please add a new property to this Processor with a name '" + missingKey - + "' and provide a RecordPath that can be used to retrieve the appropriate value.") - .build(); - validationResults.add(result); + if(validationContext.getProperty(REPLACEMENT_STRATEGY).getValue().equals(REPLACE_EXISTING_VALUES.getValue())) { + // it must be a single key lookup service + if(requiredKeys.size() != 1) { + return Collections.singleton(new ValidationResult.Builder() + .subject(LOOKUP_SERVICE.getDisplayName()) + .valid(false) + .explanation("When using \"" + REPLACE_EXISTING_VALUES.getDisplayName() + "\" as Record Update Strategy, " + + "only a Lookup Service requiring a single key can be used.") + .build()); + } + } else { + final Set<String> missingKeys = requiredKeys.stream() + .filter(key -> !dynamicPropNames.contains(key)) + .collect(Collectors.toSet()); + + if (!missingKeys.isEmpty()) { + final List<ValidationResult> validationResults = new ArrayList<>(); + for (final String missingKey : missingKeys) { + final ValidationResult result = new ValidationResult.Builder() + .subject(missingKey) + .valid(false) + .explanation("The configured Lookup Services requires that a key be provided with the name '" + missingKey + + "'. Please add a new property to this Processor with a name '" + missingKey + + "' and provide a RecordPath that can be used to retrieve the appropriate value.") + .build(); + validationResults.add(result); + } + + return validationResults; } - - return validationResults; } return Collections.emptyList(); @@ -263,6 +295,68 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa protected Set<Relationship> route(final Record record, final RecordSchema writeSchema, final FlowFile flowFile, final ProcessContext context, final Tuple<Map<String, RecordPath>, RecordPath> flowFileContext) { + final boolean isInPlaceReplacement = context.getProperty(REPLACEMENT_STRATEGY).getValue().equals(REPLACE_EXISTING_VALUES.getValue()); + + if(isInPlaceReplacement) { + return doInPlaceReplacement(record, flowFile, context, flowFileContext); + } else { + return doResultPathReplacement(record, flowFile, context, flowFileContext); + } + + } + + private Set<Relationship> doInPlaceReplacement(Record record, FlowFile flowFile, ProcessContext context, Tuple<Map<String, RecordPath>, RecordPath> flowFileContext) { + + final String lookupKey = (String) context.getProperty(LOOKUP_SERVICE).asControllerService(LookupService.class).getRequiredKeys().iterator().next(); + + final Map<String, RecordPath> recordPaths = flowFileContext.getKey(); + final Map<String, Object> lookupCoordinates = new HashMap<>(recordPaths.size()); + + for (final Map.Entry<String, RecordPath> entry : recordPaths.entrySet()) { + final String coordinateKey = entry.getKey(); + final RecordPath recordPath = entry.getValue(); + + final RecordPathResult pathResult = recordPath.evaluate(record); + final List<FieldValue> lookupFieldValues = pathResult.getSelectedFields() + .filter(fieldVal -> fieldVal.getValue() != null) + .collect(Collectors.toList()); + + if (lookupFieldValues.isEmpty()) { + final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION; + getLogger().debug("RecordPath for property '{}' did not match any fields in a record for {}; routing record to {}", new Object[] {coordinateKey, flowFile, rels}); + return rels; + } + + for (FieldValue fieldValue : lookupFieldValues) { + final Object coordinateValue = (fieldValue.getValue() instanceof Number || fieldValue.getValue() instanceof Boolean) + ? fieldValue.getValue() : DataTypeUtils.toString(fieldValue.getValue(), (String) null); + lookupCoordinates.put(lookupKey, coordinateValue); + + final Optional<?> lookupValueOption; + try { + lookupValueOption = lookupService.lookup(lookupCoordinates, flowFile.getAttributes()); + } catch (final Exception e) { + throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e); + } + + if (!lookupValueOption.isPresent()) { + final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION; + return rels; + } + + final Object lookupValue = lookupValueOption.get(); + + final DataType inferredDataType = DataTypeUtils.inferDataType(lookupValue, RecordFieldType.STRING.getDataType()); + fieldValue.updateValue(lookupValue, inferredDataType); + + } + } + + final Set<Relationship> rels = routeToMatchedUnmatched ? MATCHED_COLLECTION : SUCCESS_COLLECTION; + return rels; + } + + private Set<Relationship> doResultPathReplacement(Record record, FlowFile flowFile, ProcessContext context, Tuple<Map<String, RecordPath>, RecordPath> flowFileContext) { final Map<String, RecordPath> recordPaths = flowFileContext.getKey(); final Map<String, Object> lookupCoordinates = new HashMap<>(recordPaths.size()); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.LookupRecord/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.LookupRecord/additionalDetails.html new file mode 100644 index 0000000..df83708 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.LookupRecord/additionalDetails.html @@ -0,0 +1,215 @@ +<!DOCTYPE html> +<html lang="en"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <head> + <meta charset="utf-8" /> + <title>LookupRecord</title> + + <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" /> + </head> + + <body> + <p> + LookupRecord makes use of the NiFi <a href="../../../../../html/record-path-guide.html"> + RecordPath Domain-Specific Language (DSL)</a> to allow the user to indicate which field(s), + depending on the Record Update Strategy, in the Record should be updated. The Record will + be updated using the value returned by the provided Lookup Service. + </p> + + <h3>Record Update Strategy - Use Property</h3> + + <p> + In this case, the user should add, to the Processor's configuration, as much User-defined + Properties as required by the Lookup Service to form the lookup coordinates. The name of + the properties should match the names expected by the Lookup Service. + </p> + + <p> + The field evaluated using the path configured in the "Result RecordPath" property will be + the field updated with the value returned by the Lookup Service. + </p> + + <p> + Let's assume a Simple Key Value Lookup Service containing the following key/value pairs: + </p> + +<code> +<pre> +FR => France +CA => Canada +</pre> +</code> + + <p> + Let's assume the following JSON with three records as input: + </p> + +<code> +<pre> +[ + { + "country": null, + "code": "FR" + }, { + "country": null, + "code": "CA" + }, { + "country": null, + "code": "JP" + } +] +</pre> +</code> + + <p> + The processor is configured with "Use Property" as "Record Update Strategy", the "Result + RecordPath" is configured with "/country" and a user-defined property is added with the + name "key" (as required by this Lookup Service) and the value "/code". + </p> + + <p> + When triggered, the processor will look for the value associated to the "/code" path and + will use the value as the "key" of the Lookup Service. The value returned by the Lookup + Service will be used to update the value corresponding to "/country". With the above + examples, it will produce: + </p> + +<code> +<pre> +[ + { + "country": "France", + "code": "FR" + }, { + "country": "Canada", + "code": "CA" + }, { + "country": null, + "code": "JP" + } +] +</pre> +</code> + + <h3>Record Update Strategy - Replace Existing Values</h3> + + <p> + With this strategy, the "Result RecordPath" property will be ignored and the configured Lookup + Service must be a single single key lookup service. For each user-defined property, the value + contained in the field corresponding to the record path will be used as the key in the Lookup + Service and will be replaced by the value returned by the Lookup Service. It is possible to + configure multiple dynamic properties to update multiple fields in one execution. This strategy + only supports simple types replacements (strings, integers, etc). + </p> + + <p> + Since this strategy allows in-place replacement, it is possible to use Record Paths for fields + contained in arrays. + </p> + + <p> + Let's assume a Simple Key Value Lookup Service containing the following key/value pairs: + </p> + +<code> +<pre> +FR => France +CA => Canada +fr => French +en => English +</pre> +</code> + + <p> + Let's assume the following JSON with two records as input: + </p> + +<code> +<pre> +[ + { + "locales": [ + { + "region": "FR", + "language": "fr" + }, { + "region": "US", + "language": "en" + } + ] + }, { + "locales": [ + { + "region": "CA", + "language": "fr" + }, + { + "region": "JP", + "language": "ja" + } + ] + } +] +</pre> +</code> + + <p> + The processor is configured with "Replace Existing Values" as "Record Update Strategy", + two user-defined properties are added: "region" => "/locales[*]/region" and "language + => "/locales[*]/language".. + </p> + + <p> + When triggered, the processor will loop over the user-defined properties. First, it'll + search for the fields corresponding to "/locales[*]/region", for each value from the + record, the value will be used as the key with the Lookup Service and the value will + be replaced by the result returned by the Lookup Service. Example: the first region is + "FR" and this key is associated to the value "France" in the Lookup Service, so the + value "FR" is replaced by "France" in the record. With the above examples, it will + produce: + </p> + +<code> +<pre> +[ + { + "locales": [ + { + "region": "France", + "language": "French" + }, { + "region": "US", + "language": "English" + } + ] + }, { + "locales": [ + { + "region": "Canada", + "language": "French" + }, + { + "region": "JP", + "language": "ja" + } + ] + } +] +</pre> +</code> + + </body> +</html> \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java index f8fb158..86bba8a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java @@ -19,9 +19,13 @@ package org.apache.nifi.processors.standard; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.lookup.RecordLookupService; import org.apache.nifi.lookup.StringLookupService; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.schema.inference.SchemaInferenceUtil; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.MockRecordParser; @@ -37,6 +41,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -436,6 +442,88 @@ public class TestLookupRecord { out.assertContentEquals("John Doe,48,soccer,basketball\nJane Doe,47\n"); } + @Test + public void testLookupArray() throws InitializationException, IOException { + TestRunner runner = TestRunners.newTestRunner(LookupRecord.class); + final MapLookup lookupService = new MapLookup(); + + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA); + + runner.addControllerService("reader", jsonReader); + runner.enableControllerService(jsonReader); + runner.addControllerService("writer", jsonWriter); + runner.enableControllerService(jsonWriter); + runner.addControllerService("lookup", lookupService); + runner.enableControllerService(lookupService); + + runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_SUCCESS); + runner.setProperty(LookupRecord.REPLACEMENT_STRATEGY, LookupRecord.REPLACE_EXISTING_VALUES); + runner.setProperty(LookupRecord.RECORD_READER, "reader"); + runner.setProperty(LookupRecord.RECORD_WRITER, "writer"); + runner.setProperty(LookupRecord.LOOKUP_SERVICE, "lookup"); + runner.setProperty("lookupLanguage", "/locales[*]/language"); + runner.setProperty("lookupRegion", "/locales[*]/region"); + runner.setProperty("lookupFoo", "/foo/foo"); + + lookupService.addValue("FR", "France"); + lookupService.addValue("CA", "Canada"); + lookupService.addValue("fr", "French"); + lookupService.addValue("key", "value"); + + runner.enqueue(new File("src/test/resources/TestLookupRecord/lookup-array-input.json").toPath()); + runner.run(); + + runner.assertAllFlowFilesTransferred(LookupRecord.REL_SUCCESS); + final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_SUCCESS).get(0); + out.assertContentEquals(new File("src/test/resources/TestLookupRecord/lookup-array-output.json").toPath()); + } + + @Test + public void testLookupArrayKeyNotInLRS() throws InitializationException, IOException { + TestRunner runner = TestRunners.newTestRunner(LookupRecord.class); + final MapLookup lookupService = new MapLookup(); + + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA); + + runner.addControllerService("reader", jsonReader); + runner.enableControllerService(jsonReader); + runner.addControllerService("writer", jsonWriter); + runner.enableControllerService(jsonWriter); + runner.addControllerService("lookup", lookupService); + runner.enableControllerService(lookupService); + + runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_MATCHED_UNMATCHED); + runner.setProperty(LookupRecord.REPLACEMENT_STRATEGY, LookupRecord.REPLACE_EXISTING_VALUES); + runner.setProperty(LookupRecord.RECORD_READER, "reader"); + runner.setProperty(LookupRecord.RECORD_WRITER, "writer"); + runner.setProperty(LookupRecord.LOOKUP_SERVICE, "lookup"); + runner.setProperty("lookupLanguage", "/locales[*]/language"); + runner.setProperty("lookupRegion", "/locales[*]/region"); + runner.setProperty("lookupFoo", "/foo/foo"); + + lookupService.addValue("FR", "France"); + lookupService.addValue("CA", "Canada"); + lookupService.addValue("fr", "French"); + lookupService.addValue("badkey", "value"); + + runner.enqueue(new File("src/test/resources/TestLookupRecord/lookup-array-input.json").toPath()); + runner.run(); + + runner.assertAllFlowFilesTransferred(LookupRecord.REL_UNMATCHED); + } + private static class MapLookup extends AbstractControllerService implements StringLookupService { private final Map<String, String> values = new HashMap<>(); private Map<String, Object> expectedContext; @@ -449,6 +537,7 @@ public class TestLookupRecord { return String.class; } + @Override public Optional<String> lookup(final Map<String, Object> coordinates, Map<String, String> context) { validateContext(context); return lookup(coordinates); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestLookupRecord/lookup-array-input.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestLookupRecord/lookup-array-input.json new file mode 100644 index 0000000..f2902cd --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestLookupRecord/lookup-array-input.json @@ -0,0 +1,29 @@ +[ + { + "foo" : { + "foo" : "key" + }, + "locales": [ + { + "language" : "fr", + "region" : "CA" + }, { + "language" : "fr", + "region" : "FR" + } + ] + }, { + "foo" : { + "foo" : "key" + }, + "locales": [ + { + "language" : "fr", + "region" : "CA" + }, { + "language" : "fr", + "region" : "FR" + } + ] + } +] diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestLookupRecord/lookup-array-output.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestLookupRecord/lookup-array-output.json new file mode 100644 index 0000000..10169f8 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestLookupRecord/lookup-array-output.json @@ -0,0 +1 @@ +[{"foo":{"foo":"value"},"locales":[{"language":"French","region":"Canada"},{"language":"French","region":"France"}]},{"foo":{"foo":"value"},"locales":[{"language":"French","region":"Canada"},{"language":"French","region":"France"}]}] \ No newline at end of file
