This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 9949f079e1 NIFI-11567: Auto-reload database file in GeoEnrichIP 
processors (#7266)
9949f079e1 is described below

commit 9949f079e1cb2acdc82ccdb80f03eba42d99253b
Author: Matt Burgess <[email protected]>
AuthorDate: Mon Jun 5 13:54:17 2023 -0400

    NIFI-11567: Auto-reload database file in GeoEnrichIP processors (#7266)
    
    NIFI-11567: Auto-reload database file in GeoEnrichIP processors
---
 .../apache/nifi/processors/AbstractEnrichIP.java   | 39 ++++++++++++-
 .../org/apache/nifi/processors/GeoEnrichIP.java    | 68 ++++++++++++++++------
 .../apache/nifi/processors/GeoEnrichIPRecord.java  | 35 ++++++++++-
 .../apache/nifi/processors/TestGeoEnrichIP.java    |  4 ++
 .../nifi/processors/TestGeoEnrichIPRecord.java     |  3 +
 5 files changed, 128 insertions(+), 21 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java
 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java
index ab0fb9faaf..c29553121b 100644
--- 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java
+++ 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java
@@ -30,9 +30,12 @@ import 
org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
+import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -40,6 +43,9 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public abstract class AbstractEnrichIP extends AbstractProcessor {
 
@@ -77,6 +83,13 @@ public abstract class AbstractEnrichIP extends 
AbstractProcessor {
     private Set<Relationship> relationships;
     private List<PropertyDescriptor> propertyDescriptors;
     final AtomicReference<DatabaseReader> databaseReaderRef = new 
AtomicReference<>(null);
+    private volatile SynchronousFileWatcher watcher;
+
+    private final ReadWriteLock dbReadWriteLock = new ReentrantReadWriteLock();
+
+    private volatile File dbFile;
+
+    private volatile boolean needsReload = true;
 
     @Override
     public Set<Relationship> getRelationships() {
@@ -90,7 +103,12 @@ public abstract class AbstractEnrichIP extends 
AbstractProcessor {
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) throws IOException {
-        final File dbFile = 
context.getProperty(GEO_DATABASE_FILE).evaluateAttributeExpressions().asResource().asFile();
+        dbFile = 
context.getProperty(GEO_DATABASE_FILE).evaluateAttributeExpressions().asResource().asFile();
+        this.watcher = new SynchronousFileWatcher(Paths.get(dbFile.toURI()), 
new LastModifiedMonitor(), 30000L);
+        loadDatabaseFile();
+    }
+
+    protected void loadDatabaseFile() throws IOException {
         final StopWatch stopWatch = new StopWatch(true);
         final DatabaseReader reader = new 
DatabaseReader.Builder(dbFile).build();
         stopWatch.stop();
@@ -119,4 +137,23 @@ public abstract class AbstractEnrichIP extends 
AbstractProcessor {
         this.propertyDescriptors = Collections.unmodifiableList(props);
     }
 
+    protected SynchronousFileWatcher getWatcher() {
+        return watcher;
+    }
+
+    protected Lock getDbWriteLock() {
+        return dbReadWriteLock.writeLock();
+    }
+
+    protected Lock getDbReadLock() {
+        return dbReadWriteLock.readLock();
+    }
+
+    protected boolean isNeedsReload() {
+        return needsReload;
+    }
+
+    protected void setNeedsReload(final boolean needsReload) {
+        this.needsReload = needsReload;
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
index 763bd5b7ff..f30d414218 100644
--- 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
+++ 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors;
 
+import com.maxmind.db.InvalidDatabaseException;
 import com.maxmind.geoip2.DatabaseReader;
 import com.maxmind.geoip2.exception.GeoIp2Exception;
 import com.maxmind.geoip2.model.CityResponse;
@@ -41,6 +42,7 @@ import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
 
 @EventDriven
 @SideEffectFree
@@ -52,17 +54,17 @@ import java.util.concurrent.TimeUnit;
         + "'IP Address Attribute' property. If the name of the attribute 
provided is 'X', then the the attributes added by enrichment "
         + "will take the form X.geo.<fieldName>")
 @WritesAttributes({
-    @WritesAttribute(attribute = "X.geo.lookup.micros", description = "The 
number of microseconds that the geo lookup took"),
-    @WritesAttribute(attribute = "X.geo.city", description = "The city 
identified for the IP address"),
-    @WritesAttribute(attribute = "X.geo.accuracy", description = "The accuracy 
radius if provided by the database (in Kilometers)"),
-    @WritesAttribute(attribute = "X.geo.latitude", description = "The latitude 
identified for this IP address"),
-    @WritesAttribute(attribute = "X.geo.longitude", description = "The 
longitude identified for this IP address"),
-    @WritesAttribute(attribute = "X.geo.subdivision.N",
-            description = "Each subdivision that is identified for this IP 
address is added with a one-up number appended to the attribute name, starting 
with 0"),
-    @WritesAttribute(attribute = "X.geo.subdivision.isocode.N", description = 
"The ISO code for the subdivision that is identified by X.geo.subdivision.N"),
-    @WritesAttribute(attribute = "X.geo.country", description = "The country 
identified for this IP address"),
-    @WritesAttribute(attribute = "X.geo.country.isocode", description = "The 
ISO Code for the country identified"),
-    @WritesAttribute(attribute = "X.geo.postalcode", description = "The postal 
code for the country identified"),})
+        @WritesAttribute(attribute = "X.geo.lookup.micros", description = "The 
number of microseconds that the geo lookup took"),
+        @WritesAttribute(attribute = "X.geo.city", description = "The city 
identified for the IP address"),
+        @WritesAttribute(attribute = "X.geo.accuracy", description = "The 
accuracy radius if provided by the database (in Kilometers)"),
+        @WritesAttribute(attribute = "X.geo.latitude", description = "The 
latitude identified for this IP address"),
+        @WritesAttribute(attribute = "X.geo.longitude", description = "The 
longitude identified for this IP address"),
+        @WritesAttribute(attribute = "X.geo.subdivision.N",
+                description = "Each subdivision that is identified for this IP 
address is added with a one-up number appended to the attribute name, starting 
with 0"),
+        @WritesAttribute(attribute = "X.geo.subdivision.isocode.N", 
description = "The ISO code for the subdivision that is identified by 
X.geo.subdivision.N"),
+        @WritesAttribute(attribute = "X.geo.country", description = "The 
country identified for this IP address"),
+        @WritesAttribute(attribute = "X.geo.country.isocode", description = 
"The ISO Code for the country identified"),
+        @WritesAttribute(attribute = "X.geo.postalcode", description = "The 
postal code for the country identified"),})
 public class GeoEnrichIP extends AbstractEnrichIP {
 
     @Override
@@ -72,14 +74,34 @@ public class GeoEnrichIP extends AbstractEnrichIP {
             return;
         }
 
-        final DatabaseReader dbReader = databaseReaderRef.get();
+        try {
+            if (isNeedsReload() || getWatcher().checkAndReset()) {
+                Lock dbWriteLock = getDbWriteLock();
+                dbWriteLock.lock();
+                try {
+                    loadDatabaseFile();
+                    setNeedsReload(false);
+                } catch (InternalError | InvalidDatabaseException ie) {
+                    // The database was likely changed out while being read, 
rollback and try again
+                    setNeedsReload(true);
+                    session.rollback();
+                    return;
+                } finally {
+                    dbWriteLock.unlock();
+                }
+            }
+        } catch (final IllegalStateException | IOException e) {
+            throw new ProcessException(e.getMessage(), e);
+        }
+
+        DatabaseReader dbReader = databaseReaderRef.get();
         final String ipAttributeName = 
context.getProperty(IP_ADDRESS_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
         final String ipAttributeValue = flowFile.getAttribute(ipAttributeName);
 
         if (StringUtils.isEmpty(ipAttributeName)) {
             session.transfer(flowFile, REL_NOT_FOUND);
             getLogger().warn("FlowFile '{}' attribute '{}' was empty. Routing 
to failure",
-                    new Object[]{flowFile, 
IP_ADDRESS_ATTRIBUTE.getDisplayName()});
+                    flowFile, IP_ADDRESS_ATTRIBUTE.getDisplayName());
             return;
         }
 
@@ -93,14 +115,24 @@ public class GeoEnrichIP extends AbstractEnrichIP {
             getLogger().warn("Could not resolve the IP for value '{}', 
contained within the attribute '{}' in " +
                             "FlowFile '{}'. This is usually caused by issue 
resolving the appropriate DNS record or " +
                             "providing the processor with an invalid IP 
address ",
-                            new Object[]{ipAttributeValue, 
IP_ADDRESS_ATTRIBUTE.getDisplayName(), flowFile}, ioe);
+                    new Object[]{ipAttributeValue, 
IP_ADDRESS_ATTRIBUTE.getDisplayName(), flowFile}, ioe);
             return;
         }
 
-        final StopWatch stopWatch = new StopWatch(true);
+        StopWatch stopWatch = new StopWatch(true);
         try {
+            getDbReadLock().lock();
             response = dbReader.city(inetAddress);
-            stopWatch.stop();
+        } catch (InternalError ie) {
+            // The database was likely changed out while being read, rollback 
and try again
+            setNeedsReload(true);
+            session.rollback();
+            return;
+        } catch (InvalidDatabaseException idbe) {
+            getLogger().warn("Failure while trying to load enrichment data for 
{} due to {}, rolling back session "
+                    + "and will reload the database on the next run", 
flowFile, idbe.getMessage());
+            session.rollback();
+            return;
         } catch (GeoIp2Exception | IOException ex) {
             // Note IOException is captured again as dbReader also makes 
InetAddress.getByName() calls.
             // Most name or IP resolutions failure should have been triggered 
in the try loop above but
@@ -108,6 +140,9 @@ public class GeoEnrichIP extends AbstractEnrichIP {
             session.transfer(flowFile, REL_NOT_FOUND);
             getLogger().warn("Failure while trying to find enrichment data for 
{} due to {}", new Object[]{flowFile, ex}, ex);
             return;
+        } finally {
+            stopWatch.stop();
+            getDbReadLock().unlock();
         }
 
         if (response == null) {
@@ -147,5 +182,4 @@ public class GeoEnrichIP extends AbstractEnrichIP {
 
         session.transfer(flowFile, REL_FOUND);
     }
-
 }
diff --git 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java
 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java
index 11d64aad89..d8c6aec07e 100644
--- 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java
+++ 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors;
 
+import com.maxmind.db.InvalidDatabaseException;
 import com.maxmind.geoip2.DatabaseReader;
 import com.maxmind.geoip2.model.CityResponse;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -54,6 +55,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.locks.Lock;
 
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @Tags({"geo", "enrich", "ip", "maxmind", "record"})
@@ -194,7 +196,26 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP {
 
         FlowFile output = session.create(input);
         FlowFile notFound = splitOutput ? session.create(input) : null;
-        final DatabaseReader dbReader = databaseReaderRef.get();
+        try {
+            if (isNeedsReload() || getWatcher().checkAndReset()) {
+                Lock dbWriteLock = getDbWriteLock();
+                dbWriteLock.lock();
+                try {
+                    loadDatabaseFile();
+                    setNeedsReload(false);
+                } catch (InternalError | InvalidDatabaseException ie) {
+                    // The database was likely changed out while being read, 
rollback and try again
+                    setNeedsReload(true);
+                    session.rollback();
+                    return;
+                } finally {
+                    dbWriteLock.unlock();
+                }
+            }
+        } catch (final IllegalStateException | IOException e) {
+            throw new ProcessException(e.getMessage(), e);
+        }
+        DatabaseReader dbReader = databaseReaderRef.get();
         try (InputStream is = session.read(input);
              OutputStream os = session.write(output);
              OutputStream osNotFound = splitOutput ? session.write(notFound) : 
null) {
@@ -227,12 +248,13 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP {
             int foundCount = 0;
             int notFoundCount = 0;
             while ((record = reader.nextRecord()) != null) {
-                CityResponse response = geocode(ipPath, record, dbReader);
+                CityResponse response;
+                response = geocode(ipPath, record, dbReader);
                 boolean wasEnriched = enrichRecord(response, record, paths);
                 if (wasEnriched) {
                     targetRelationship = REL_FOUND;
                 }
-                if (!splitOutput || (splitOutput && wasEnriched)) {
+                if (!splitOutput || wasEnriched) {
                     writer.write(record);
                     foundCount++;
                 } else {
@@ -270,6 +292,13 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP {
                 session.getProvenanceReporter().modifyContent(notFound);
             }
             session.getProvenanceReporter().modifyContent(output);
+        } catch (InvalidDatabaseException | InternalError idbe) {
+            // The database was likely changed out while being read, rollback 
and try again
+            setNeedsReload(true);
+            getLogger().warn("Failure while trying to load enrichment data due 
to {}, rolling back session "
+                    + "and will reload the database on the next run", 
idbe.getMessage());
+            session.rollback();
+            return;
         } catch (Exception ex) {
             getLogger().error("Error enriching records.", ex);
             session.rollback();
diff --git 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIP.java
 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIP.java
index 93c4c9bf13..a5edb64a35 100644
--- 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIP.java
+++ 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIP.java
@@ -282,5 +282,9 @@ public class TestGeoEnrichIP {
         public void onScheduled(ProcessContext context) {
             databaseReaderRef.set(databaseReader);
         }
+
+        protected void loadDatabaseFile() {
+            //  Do nothing, the mock database reader is used
+        }
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIPRecord.java
 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIPRecord.java
index 301fc9a4cd..cabd48bd55 100644
--- 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIPRecord.java
+++ 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIPRecord.java
@@ -162,5 +162,8 @@ public class TestGeoEnrichIPRecord {
             writerFactory = 
context.getProperty(WRITER).asControllerService(RecordSetWriterFactory.class);
             splitOutput = 
context.getProperty(SPLIT_FOUND_NOT_FOUND).asBoolean();
         }
+        protected void loadDatabaseFile() {
+            //  Do nothing, the mock database reader is used
+        }
     }
 }

Reply via email to