NIFI-3970 added changes from a code review.

Signed-off-by: Matthew Burgess <[email protected]>

This closes #2334


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bfe92b90
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bfe92b90
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bfe92b90

Branch: refs/heads/master
Commit: bfe92b9000f0d8203cbc1d95a897d96d2c65f092
Parents: 6384037
Author: Mike Thomsen <[email protected]>
Authored: Fri Dec 1 08:45:39 2017 -0500
Committer: Matthew Burgess <[email protected]>
Committed: Mon Dec 11 19:16:34 2017 -0500

----------------------------------------------------------------------
 .../nifi/lookup/CSVRecordLookupService.java     | 53 +++++++++++---------
 1 file changed, 28 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/bfe92b90/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java
index fdec5f2..b526e25 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java
@@ -39,7 +39,6 @@ import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
 import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
 
-import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
 import java.nio.file.Paths;
@@ -58,7 +57,10 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 @Tags({"lookup", "cache", "enrich", "join", "csv", "reloadable", "key", 
"value", "record"})
-@CapabilityDescription("A reloadable CSV file-based lookup service. When the 
lookup key is found in the CSV file, the remaining columns are returned as a 
Record.")
+@CapabilityDescription(
+        "A reloadable CSV file-based lookup service. When the lookup key is 
found in the CSV file, " +
+        "the columns are returned as a Record. All returned fields will be 
strings."
+)
 public class CSVRecordLookupService extends AbstractControllerService 
implements RecordLookupService {
 
     private static final String KEY = "key";
@@ -69,15 +71,16 @@ public class CSVRecordLookupService extends 
AbstractControllerService implements
             new PropertyDescriptor.Builder()
                     .name("csv-file")
                     .displayName("CSV File")
-                    .description("A CSV file.")
+                    .description("A CSV file that will serve as the data 
source.")
                     .required(true)
                     .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
                     .expressionLanguageSupported(true)
                     .build();
 
     static final PropertyDescriptor CSV_FORMAT = new 
PropertyDescriptor.Builder()
-            .name("CSV Format")
-            .description("Specifies which \"format\" the CSV data is in, or 
specifies if custom formatting should be used.")
+            .name("csv-format")
+            .displayName("CSV Format")
+            .description("Specifies which \"format\" the CSV data is in.")
             .expressionLanguageSupported(false)
             
.allowableValues(Arrays.asList(CSVFormat.Predefined.values()).stream().map(e -> 
e.toString()).collect(Collectors.toSet()))
             .defaultValue(CSVFormat.Predefined.Default.toString())
@@ -88,7 +91,8 @@ public class CSVRecordLookupService extends 
AbstractControllerService implements
             new PropertyDescriptor.Builder()
                     .name("lookup-key-column")
                     .displayName("Lookup Key Column")
-                    .description("Lookup key column.")
+                    .description("The field in the CSV file that will serve as 
the lookup key. " +
+                            "This is the field that will be matched against 
the property specified in the lookup processor.")
                     .required(true)
                     .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
                     .expressionLanguageSupported(true)
@@ -107,7 +111,7 @@ public class CSVRecordLookupService extends 
AbstractControllerService implements
 
     private List<PropertyDescriptor> properties;
 
-    private volatile ConcurrentMap<String, Map<String, Object>> cache;
+    private volatile ConcurrentMap<String, Record> cache;
 
     private volatile String csvFile;
 
@@ -131,15 +135,16 @@ public class CSVRecordLookupService extends 
AbstractControllerService implements
 
                 final FileReader reader = new FileReader(csvFile);
                 final CSVParser records = 
csvFormat.withFirstRecordAsHeader().parse(reader);
-                this.cache = new ConcurrentHashMap<>();
+                ConcurrentHashMap<String, Record> cache = new 
ConcurrentHashMap<>();
+                RecordSchema lookupRecordSchema = null;
                 for (final CSVRecord record : records) {
                     final String key = record.get(lookupKeyColumn);
 
                     if (StringUtils.isBlank(key)) {
                         throw new IllegalStateException("Empty lookup key 
encountered in: " + csvFile);
-                    } else if (!ignoreDuplicates && 
this.cache.containsKey(key)) {
+                    } else if (!ignoreDuplicates && cache.containsKey(key)) {
                         throw new IllegalStateException("Duplicate lookup key 
encountered: " + key + " in " + csvFile);
-                    } else if (ignoreDuplicates && 
this.cache.containsKey(key)) {
+                    } else if (ignoreDuplicates && cache.containsKey(key)) {
                         logger.warn("Duplicate lookup key encountered: {} in 
{}", new Object[]{key, csvFile});
                     }
 
@@ -150,9 +155,18 @@ public class CSVRecordLookupService extends 
AbstractControllerService implements
                             properties.put(k, v);
                         }
                     });
-                    cache.put(key, properties);
+
+                    if (lookupRecordSchema == null) {
+                        List<RecordField> recordFields = new 
ArrayList<>(properties.size());
+                        properties.forEach((k, v) -> recordFields.add(new 
RecordField(k, RecordFieldType.STRING.getDataType())));
+                        lookupRecordSchema = new 
SimpleRecordSchema(recordFields);
+                    }
+
+                    cache.put(key, new MapRecord(lookupRecordSchema, 
properties));
                 }
 
+                this.cache = cache;
+
                 if (cache.isEmpty()) {
                     logger.warn("Lookup table is empty after reading file: " + 
csvFile);
                 }
@@ -178,7 +192,7 @@ public class CSVRecordLookupService extends 
AbstractControllerService implements
     }
 
     @OnEnabled
-    public void onEnabled(final ConfigurationContext context) throws 
InitializationException, IOException, FileNotFoundException {
+    public void onEnabled(final ConfigurationContext context) throws 
InitializationException, IOException {
         this.csvFile = context.getProperty(CSV_FILE).getValue();
         this.csvFormat = 
CSVFormat.Predefined.valueOf(context.getProperty(CSV_FORMAT).getValue()).getFormat();
         this.lookupKeyColumn = 
context.getProperty(LOOKUP_KEY_COLUMN).getValue();
@@ -203,25 +217,14 @@ public class CSVRecordLookupService extends 
AbstractControllerService implements
         }
 
         try {
-            if (watcher != null && watcher.checkAndReset()) {
+            if (watcher.checkAndReset()) {
                 loadCache();
             }
         } catch (final IllegalStateException | IOException e) {
             throw new LookupFailureException(e.getMessage(), e);
         }
 
-        final Record lookupRecord;
-        Map<String, Object> recordMap = cache.get(key);
-        if (recordMap != null) {
-            List<RecordField> recordFields = new ArrayList<>(recordMap.size());
-            recordMap.forEach((k, v) -> recordFields.add(new RecordField(k, 
RecordFieldType.STRING.getDataType())));
-            final RecordSchema lookupRecordSchema = new 
SimpleRecordSchema(recordFields);
-            lookupRecord = new MapRecord(lookupRecordSchema, recordMap);
-        } else {
-            lookupRecord = null;
-        }
-
-        return Optional.ofNullable(lookupRecord);
+        return Optional.ofNullable(cache.get(key));
     }
 
     @Override

Reply via email to