This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 9949f079e1 NIFI-11567: Auto-reload database file in GeoEnrichIP
processors (#7266)
9949f079e1 is described below
commit 9949f079e1cb2acdc82ccdb80f03eba42d99253b
Author: Matt Burgess <[email protected]>
AuthorDate: Mon Jun 5 13:54:17 2023 -0400
NIFI-11567: Auto-reload database file in GeoEnrichIP processors (#7266)
NIFI-11567: Auto-reload database file in GeoEnrichIP processors
---
.../apache/nifi/processors/AbstractEnrichIP.java | 39 ++++++++++++-
.../org/apache/nifi/processors/GeoEnrichIP.java | 68 ++++++++++++++++------
.../apache/nifi/processors/GeoEnrichIPRecord.java | 35 ++++++++++-
.../apache/nifi/processors/TestGeoEnrichIP.java | 4 ++
.../nifi/processors/TestGeoEnrichIPRecord.java | 3 +
5 files changed, 128 insertions(+), 21 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java
index ab0fb9faaf..c29553121b 100644
---
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java
+++
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java
@@ -30,9 +30,12 @@ import
org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
+import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -40,6 +43,9 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
public abstract class AbstractEnrichIP extends AbstractProcessor {
@@ -77,6 +83,13 @@ public abstract class AbstractEnrichIP extends
AbstractProcessor {
private Set<Relationship> relationships;
private List<PropertyDescriptor> propertyDescriptors;
final AtomicReference<DatabaseReader> databaseReaderRef = new
AtomicReference<>(null);
+ private volatile SynchronousFileWatcher watcher;
+
+ private final ReadWriteLock dbReadWriteLock = new ReentrantReadWriteLock();
+
+ private volatile File dbFile;
+
+ private volatile boolean needsReload = true;
@Override
public Set<Relationship> getRelationships() {
@@ -90,7 +103,12 @@ public abstract class AbstractEnrichIP extends
AbstractProcessor {
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException {
- final File dbFile =
context.getProperty(GEO_DATABASE_FILE).evaluateAttributeExpressions().asResource().asFile();
+ dbFile =
context.getProperty(GEO_DATABASE_FILE).evaluateAttributeExpressions().asResource().asFile();
+ this.watcher = new SynchronousFileWatcher(Paths.get(dbFile.toURI()),
new LastModifiedMonitor(), 30000L);
+ loadDatabaseFile();
+ }
+
+ protected void loadDatabaseFile() throws IOException {
final StopWatch stopWatch = new StopWatch(true);
final DatabaseReader reader = new
DatabaseReader.Builder(dbFile).build();
stopWatch.stop();
@@ -119,4 +137,23 @@ public abstract class AbstractEnrichIP extends
AbstractProcessor {
this.propertyDescriptors = Collections.unmodifiableList(props);
}
+ protected SynchronousFileWatcher getWatcher() {
+ return watcher;
+ }
+
+ protected Lock getDbWriteLock() {
+ return dbReadWriteLock.writeLock();
+ }
+
+ protected Lock getDbReadLock() {
+ return dbReadWriteLock.readLock();
+ }
+
+ protected boolean isNeedsReload() {
+ return needsReload;
+ }
+
+ protected void setNeedsReload(final boolean needsReload) {
+ this.needsReload = needsReload;
+ }
}
diff --git
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
index 763bd5b7ff..f30d414218 100644
---
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
+++
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors;
+import com.maxmind.db.InvalidDatabaseException;
import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.exception.GeoIp2Exception;
import com.maxmind.geoip2.model.CityResponse;
@@ -41,6 +42,7 @@ import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
@EventDriven
@SideEffectFree
@@ -52,17 +54,17 @@ import java.util.concurrent.TimeUnit;
+ "'IP Address Attribute' property. If the name of the attribute
provided is 'X', then the the attributes added by enrichment "
+ "will take the form X.geo.<fieldName>")
@WritesAttributes({
- @WritesAttribute(attribute = "X.geo.lookup.micros", description = "The
number of microseconds that the geo lookup took"),
- @WritesAttribute(attribute = "X.geo.city", description = "The city
identified for the IP address"),
- @WritesAttribute(attribute = "X.geo.accuracy", description = "The accuracy
radius if provided by the database (in Kilometers)"),
- @WritesAttribute(attribute = "X.geo.latitude", description = "The latitude
identified for this IP address"),
- @WritesAttribute(attribute = "X.geo.longitude", description = "The
longitude identified for this IP address"),
- @WritesAttribute(attribute = "X.geo.subdivision.N",
- description = "Each subdivision that is identified for this IP
address is added with a one-up number appended to the attribute name, starting
with 0"),
- @WritesAttribute(attribute = "X.geo.subdivision.isocode.N", description =
"The ISO code for the subdivision that is identified by X.geo.subdivision.N"),
- @WritesAttribute(attribute = "X.geo.country", description = "The country
identified for this IP address"),
- @WritesAttribute(attribute = "X.geo.country.isocode", description = "The
ISO Code for the country identified"),
- @WritesAttribute(attribute = "X.geo.postalcode", description = "The postal
code for the country identified"),})
+ @WritesAttribute(attribute = "X.geo.lookup.micros", description = "The
number of microseconds that the geo lookup took"),
+ @WritesAttribute(attribute = "X.geo.city", description = "The city
identified for the IP address"),
+ @WritesAttribute(attribute = "X.geo.accuracy", description = "The
accuracy radius if provided by the database (in Kilometers)"),
+ @WritesAttribute(attribute = "X.geo.latitude", description = "The
latitude identified for this IP address"),
+ @WritesAttribute(attribute = "X.geo.longitude", description = "The
longitude identified for this IP address"),
+ @WritesAttribute(attribute = "X.geo.subdivision.N",
+ description = "Each subdivision that is identified for this IP
address is added with a one-up number appended to the attribute name, starting
with 0"),
+ @WritesAttribute(attribute = "X.geo.subdivision.isocode.N",
description = "The ISO code for the subdivision that is identified by
X.geo.subdivision.N"),
+ @WritesAttribute(attribute = "X.geo.country", description = "The
country identified for this IP address"),
+ @WritesAttribute(attribute = "X.geo.country.isocode", description =
"The ISO Code for the country identified"),
+ @WritesAttribute(attribute = "X.geo.postalcode", description = "The
postal code for the country identified"),})
public class GeoEnrichIP extends AbstractEnrichIP {
@Override
@@ -72,14 +74,34 @@ public class GeoEnrichIP extends AbstractEnrichIP {
return;
}
- final DatabaseReader dbReader = databaseReaderRef.get();
+ try {
+ if (isNeedsReload() || getWatcher().checkAndReset()) {
+ Lock dbWriteLock = getDbWriteLock();
+ dbWriteLock.lock();
+ try {
+ loadDatabaseFile();
+ setNeedsReload(false);
+ } catch (InternalError | InvalidDatabaseException ie) {
+ // The database was likely changed out while being read,
rollback and try again
+ setNeedsReload(true);
+ session.rollback();
+ return;
+ } finally {
+ dbWriteLock.unlock();
+ }
+ }
+ } catch (final IllegalStateException | IOException e) {
+ throw new ProcessException(e.getMessage(), e);
+ }
+
+ DatabaseReader dbReader = databaseReaderRef.get();
final String ipAttributeName =
context.getProperty(IP_ADDRESS_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
final String ipAttributeValue = flowFile.getAttribute(ipAttributeName);
if (StringUtils.isEmpty(ipAttributeName)) {
session.transfer(flowFile, REL_NOT_FOUND);
getLogger().warn("FlowFile '{}' attribute '{}' was empty. Routing
to failure",
- new Object[]{flowFile,
IP_ADDRESS_ATTRIBUTE.getDisplayName()});
+ flowFile, IP_ADDRESS_ATTRIBUTE.getDisplayName());
return;
}
@@ -93,14 +115,24 @@ public class GeoEnrichIP extends AbstractEnrichIP {
getLogger().warn("Could not resolve the IP for value '{}',
contained within the attribute '{}' in " +
"FlowFile '{}'. This is usually caused by issue
resolving the appropriate DNS record or " +
"providing the processor with an invalid IP
address ",
- new Object[]{ipAttributeValue,
IP_ADDRESS_ATTRIBUTE.getDisplayName(), flowFile}, ioe);
+ new Object[]{ipAttributeValue,
IP_ADDRESS_ATTRIBUTE.getDisplayName(), flowFile}, ioe);
return;
}
- final StopWatch stopWatch = new StopWatch(true);
+ StopWatch stopWatch = new StopWatch(true);
try {
+ getDbReadLock().lock();
response = dbReader.city(inetAddress);
- stopWatch.stop();
+ } catch (InternalError ie) {
+ // The database was likely changed out while being read, rollback
and try again
+ setNeedsReload(true);
+ session.rollback();
+ return;
+ } catch (InvalidDatabaseException idbe) {
+ getLogger().warn("Failure while trying to load enrichment data for
{} due to {}, rolling back session "
+ + "and will reload the database on the next run",
flowFile, idbe.getMessage());
+ session.rollback();
+ return;
} catch (GeoIp2Exception | IOException ex) {
// Note IOException is captured again as dbReader also makes
InetAddress.getByName() calls.
// Most name or IP resolutions failure should have been triggered
in the try loop above but
@@ -108,6 +140,9 @@ public class GeoEnrichIP extends AbstractEnrichIP {
session.transfer(flowFile, REL_NOT_FOUND);
getLogger().warn("Failure while trying to find enrichment data for
{} due to {}", new Object[]{flowFile, ex}, ex);
return;
+ } finally {
+ stopWatch.stop();
+ getDbReadLock().unlock();
}
if (response == null) {
@@ -147,5 +182,4 @@ public class GeoEnrichIP extends AbstractEnrichIP {
session.transfer(flowFile, REL_FOUND);
}
-
}
diff --git
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java
index 11d64aad89..d8c6aec07e 100644
---
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java
+++
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors;
+import com.maxmind.db.InvalidDatabaseException;
import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.model.CityResponse;
import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -54,6 +55,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.locks.Lock;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"geo", "enrich", "ip", "maxmind", "record"})
@@ -194,7 +196,26 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP {
FlowFile output = session.create(input);
FlowFile notFound = splitOutput ? session.create(input) : null;
- final DatabaseReader dbReader = databaseReaderRef.get();
+ try {
+ if (isNeedsReload() || getWatcher().checkAndReset()) {
+ Lock dbWriteLock = getDbWriteLock();
+ dbWriteLock.lock();
+ try {
+ loadDatabaseFile();
+ setNeedsReload(false);
+ } catch (InternalError | InvalidDatabaseException ie) {
+ // The database was likely changed out while being read,
rollback and try again
+ setNeedsReload(true);
+ session.rollback();
+ return;
+ } finally {
+ dbWriteLock.unlock();
+ }
+ }
+ } catch (final IllegalStateException | IOException e) {
+ throw new ProcessException(e.getMessage(), e);
+ }
+ DatabaseReader dbReader = databaseReaderRef.get();
try (InputStream is = session.read(input);
OutputStream os = session.write(output);
OutputStream osNotFound = splitOutput ? session.write(notFound) :
null) {
@@ -227,12 +248,13 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP {
int foundCount = 0;
int notFoundCount = 0;
while ((record = reader.nextRecord()) != null) {
- CityResponse response = geocode(ipPath, record, dbReader);
+ CityResponse response;
+ response = geocode(ipPath, record, dbReader);
boolean wasEnriched = enrichRecord(response, record, paths);
if (wasEnriched) {
targetRelationship = REL_FOUND;
}
- if (!splitOutput || (splitOutput && wasEnriched)) {
+ if (!splitOutput || wasEnriched) {
writer.write(record);
foundCount++;
} else {
@@ -270,6 +292,13 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP {
session.getProvenanceReporter().modifyContent(notFound);
}
session.getProvenanceReporter().modifyContent(output);
+ } catch (InvalidDatabaseException | InternalError idbe) {
+ // The database was likely changed out while being read, rollback
and try again
+ setNeedsReload(true);
+ getLogger().warn("Failure while trying to load enrichment data due
to {}, rolling back session "
+ + "and will reload the database on the next run",
idbe.getMessage());
+ session.rollback();
+ return;
} catch (Exception ex) {
getLogger().error("Error enriching records.", ex);
session.rollback();
diff --git
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIP.java
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIP.java
index 93c4c9bf13..a5edb64a35 100644
---
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIP.java
+++
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIP.java
@@ -282,5 +282,9 @@ public class TestGeoEnrichIP {
public void onScheduled(ProcessContext context) {
databaseReaderRef.set(databaseReader);
}
+
+ protected void loadDatabaseFile() {
+ // Do nothing, the mock database reader is used
+ }
}
}
diff --git
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIPRecord.java
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIPRecord.java
index 301fc9a4cd..cabd48bd55 100644
---
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIPRecord.java
+++
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIPRecord.java
@@ -162,5 +162,8 @@ public class TestGeoEnrichIPRecord {
writerFactory =
context.getProperty(WRITER).asControllerService(RecordSetWriterFactory.class);
splitOutput =
context.getProperty(SPLIT_FOUND_NOT_FOUND).asBoolean();
}
+ protected void loadDatabaseFile() {
+ // Do nothing, the mock database reader is used
+ }
}
}