Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2130#discussion_r137845553
--- Diff:
nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java
---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
+import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
+
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Tags({"lookup", "cache", "enrich", "join", "csv", "reloadable", "key",
"value", "record"})
+@CapabilityDescription("A reloadable CSV file-based lookup service. When
the lookup key is found in the CSV file, the remaining columns are returned as
a Record.")
+public class CSVRecordLookupService extends AbstractControllerService
implements RecordLookupService {
+
+ private static final String KEY = "key";
+
+ private static final Set<String> REQUIRED_KEYS =
Collections.unmodifiableSet(Stream.of(KEY).collect(Collectors.toSet()));
+
+ public static final PropertyDescriptor CSV_FILE =
+ new PropertyDescriptor.Builder()
+ .name("csv-file")
+ .displayName("CSV File")
+ .description("A CSV file.")
+ .required(true)
+ .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ static final PropertyDescriptor CSV_FORMAT = new
PropertyDescriptor.Builder()
+ .name("CSV Format")
+ .description("Specifies which \"format\" the CSV data is in,
or specifies if custom formatting should be used.")
+ .expressionLanguageSupported(false)
+
.allowableValues(Arrays.asList(CSVFormat.Predefined.values()).stream().map(e ->
e.toString()).collect(Collectors.toSet()))
+ .defaultValue(CSVFormat.Predefined.Default.toString())
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor LOOKUP_KEY_COLUMN =
+ new PropertyDescriptor.Builder()
+ .name("lookup-key-column")
+ .displayName("Lookup Key Column")
+ .description("Lookup key column.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor IGNORE_DUPLICATES =
+ new PropertyDescriptor.Builder()
+ .name("ignore-duplicates")
+ .displayName("Ignore Duplicates")
+ .description("Ignore duplicate keys for records in the
CSV file.")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .required(true)
+ .build();
+
+ private List<PropertyDescriptor> properties;
+
+ private volatile ConcurrentMap<String, Map<String, Object>> cache;
+
+ private volatile String csvFile;
+
+ private volatile CSVFormat csvFormat;
+
+ private volatile String lookupKeyColumn;
+
+ private volatile boolean ignoreDuplicates;
+
+ private volatile SynchronousFileWatcher watcher;
+
+ private final ReentrantLock lock = new ReentrantLock();
+
+ private void loadCache() throws IllegalStateException, IOException {
+ if (lock.tryLock()) {
+ try {
+ final ComponentLog logger = getLogger();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Loading lookup table from file: " +
csvFile);
+ }
+
+ final FileReader reader = new FileReader(csvFile);
+ final CSVParser records =
csvFormat.withFirstRecordAsHeader().parse(reader);
+ this.cache = new ConcurrentHashMap<>();
+ for (final CSVRecord record : records) {
+ final String key = record.get(lookupKeyColumn);
+
+ if (StringUtils.isBlank(key)) {
+ throw new IllegalStateException("Empty lookup key
encountered in: " + csvFile);
+ } else if (!ignoreDuplicates &&
this.cache.containsKey(key)) {
+ throw new IllegalStateException("Duplicate lookup
key encountered: " + key + " in " + csvFile);
+ } else if (ignoreDuplicates &&
this.cache.containsKey(key)) {
+ logger.warn("Duplicate lookup key encountered: {}
in {}", new Object[]{key, csvFile});
+ }
+
+ // Put each key/value pair (except the lookup) into
the properties
+ final Map<String, Object> properties = new HashMap<>();
+ record.toMap().forEach((k, v) -> {
+ if (!lookupKeyColumn.equals(k)) {
+ properties.put(k, v);
+ }
+ });
+ cache.put(key, properties);
--- End diff --
We should probably be putting a Record in this map instead of a Map<String,
Object> so that we can avoid creating that Record object every time that we
call 'lookup'. This would also allow us to create the RecordSchema object once
and use it for every Record object, rather than having to create a separate
RecordSchema object per record.
---