This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 3d607a7b6f NIFI-12230 Add configurable Log Level for IP not found in
GeoEnrichIP
3d607a7b6f is described below
commit 3d607a7b6f37f1d24427e499df87362e45a3dfbb
Author: Pierre Villard <[email protected]>
AuthorDate: Thu Oct 19 22:12:34 2023 +0200
NIFI-12230 Add configurable Log Level for IP not found in GeoEnrichIP
NIFI-12253 Route to not found relationship instead of rolling back in
GeoEnrichIPRecord
This closes #7909
Signed-off-by: David Handermann <[email protected]>
(cherry picked from commit 184757fede3e253ba47552ab8e6fe8bd75f8f49f)
---
.../apache/nifi/processors/AbstractEnrichIP.java | 15 ++++++++++
.../org/apache/nifi/processors/GeoEnrichIP.java | 22 +++++++++++++++
.../apache/nifi/processors/GeoEnrichIPRecord.java | 32 +++++++++++++++++++---
.../nifi/processors/TestGeoEnrichIPRecord.java | 10 +++++--
4 files changed, 73 insertions(+), 6 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java
index c29553121b..663c02e90a 100644
---
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java
+++
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java
@@ -70,6 +70,16 @@ public abstract class AbstractEnrichIP extends
AbstractProcessor {
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
.build();
+ public static final PropertyDescriptor LOG_LEVEL = new
PropertyDescriptor.Builder()
+ .name("Log Level")
+ .displayName("Log Level")
+ .required(true)
+ .description("The Log Level to use when an IP is not found in the
database. Accepted values: INFO, DEBUG, WARN, ERROR.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .defaultValue(MessageLogLevel.WARN.toString())
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
public static final Relationship REL_FOUND = new Relationship.Builder()
.name("found")
.description("Where to route flow files after successfully
enriching attributes with data provided by database")
@@ -80,6 +90,10 @@ public abstract class AbstractEnrichIP extends
AbstractProcessor {
.description("Where to route flow files after unsuccessfully
enriching attributes because no data was found")
.build();
+ enum MessageLogLevel {
+ DEBUG, INFO, WARN, ERROR
+ }
+
private Set<Relationship> relationships;
private List<PropertyDescriptor> propertyDescriptors;
final AtomicReference<DatabaseReader> databaseReaderRef = new
AtomicReference<>(null);
@@ -134,6 +148,7 @@ public abstract class AbstractEnrichIP extends
AbstractProcessor {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(GEO_DATABASE_FILE);
props.add(IP_ADDRESS_ATTRIBUTE);
+ props.add(LOG_LEVEL);
this.propertyDescriptors = Collections.unmodifiableList(props);
}
diff --git
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
index 74ee2ec9d7..0948a15c82 100644
---
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
+++
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors;
import com.maxmind.db.InvalidDatabaseException;
import com.maxmind.geoip2.DatabaseReader;
+import com.maxmind.geoip2.exception.AddressNotFoundException;
import com.maxmind.geoip2.exception.GeoIp2Exception;
import com.maxmind.geoip2.model.CityResponse;
import com.maxmind.geoip2.record.Subdivision;
@@ -95,6 +96,7 @@ public class GeoEnrichIP extends AbstractEnrichIP {
}
DatabaseReader dbReader = databaseReaderRef.get();
+ final MessageLogLevel logLevel =
MessageLogLevel.valueOf(context.getProperty(LOG_LEVEL).evaluateAttributeExpressions(flowFile).getValue().toUpperCase());
final String ipAttributeName =
context.getProperty(IP_ADDRESS_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
final String ipAttributeValue = flowFile.getAttribute(ipAttributeName);
@@ -132,6 +134,26 @@ public class GeoEnrichIP extends AbstractEnrichIP {
getLogger().warn("Failure while trying to load enrichment data for
{} due to {}, rolling back session "
+ "and will reload the database on the next run",
flowFile, idbe.getMessage());
session.rollback();
+ return;
+ } catch (AddressNotFoundException anfe) {
+ session.transfer(flowFile, REL_NOT_FOUND);
+
+ switch (logLevel) {
+ case INFO:
+ getLogger().info("Address not found in the database",
anfe);
+ break;
+ case WARN:
+ getLogger().warn("Address not found in the database",
anfe);
+ break;
+ case ERROR:
+ getLogger().error("Address not found in the database",
anfe);
+ break;
+ case DEBUG:
+ default:
+ getLogger().debug("Address not found in the database",
anfe);
+ break;
+ }
+
return;
} catch (GeoIp2Exception | IOException ex) {
// Note IOException is captured again as dbReader also makes
InetAddress.getByName() calls.
diff --git
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java
index d8c6aec07e..e917b01047 100644
---
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java
+++
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors;
import com.maxmind.db.InvalidDatabaseException;
import com.maxmind.geoip2.DatabaseReader;
+import com.maxmind.geoip2.exception.AddressNotFoundException;
import com.maxmind.geoip2.model.CityResponse;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -160,7 +161,7 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP {
private static final List<PropertyDescriptor> DESCRIPTORS =
Collections.unmodifiableList(Arrays.asList(
GEO_DATABASE_FILE, READER, WRITER, SPLIT_FOUND_NOT_FOUND,
IP_RECORD_PATH, GEO_CITY, GEO_LATITUDE,
- GEO_LONGITUDE, GEO_COUNTRY, GEO_COUNTRY_ISO, GEO_POSTAL_CODE
+ GEO_LONGITUDE, GEO_COUNTRY, GEO_COUNTRY_ISO, GEO_POSTAL_CODE,
LOG_LEVEL
));
@Override
@@ -231,6 +232,8 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP {
}
String rawIpPath =
context.getProperty(IP_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+ final MessageLogLevel logLevel =
MessageLogLevel.valueOf(context.getProperty(LOG_LEVEL).evaluateAttributeExpressions(input).getValue().toUpperCase());
+
RecordPath ipPath = cache.getCompiled(rawIpPath);
RecordReader reader = readerFactory.createRecordReader(input, is,
getLogger());
@@ -249,7 +252,7 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP {
int notFoundCount = 0;
while ((record = reader.nextRecord()) != null) {
CityResponse response;
- response = geocode(ipPath, record, dbReader);
+ response = geocode(ipPath, record, dbReader, logLevel);
boolean wasEnriched = enrichRecord(response, record, paths);
if (wasEnriched) {
targetRelationship = REL_FOUND;
@@ -314,7 +317,7 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP {
return retVal;
}
- private CityResponse geocode(RecordPath ipPath, Record record,
DatabaseReader reader) throws Exception {
+ private CityResponse geocode(RecordPath ipPath, Record record,
DatabaseReader reader, MessageLogLevel logLevel) throws Exception {
RecordPathResult result = ipPath.evaluate(record);
Optional<FieldValue> ipField = result.getSelectedFields().findFirst();
if (ipField.isPresent()) {
@@ -326,7 +329,28 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP {
String realValue = val.toString();
InetAddress address = InetAddress.getByName(realValue);
- return reader.city(address);
+ try {
+ return reader.city(address);
+ } catch (AddressNotFoundException anfe) {
+
+ switch (logLevel) {
+ case INFO:
+ getLogger().info("Address not found in the database",
anfe);
+ break;
+ case WARN:
+ getLogger().warn("Address not found in the database",
anfe);
+ break;
+ case ERROR:
+ getLogger().error("Address not found in the database",
anfe);
+ break;
+ case DEBUG:
+ default:
+ getLogger().debug("Address not found in the database",
anfe);
+ break;
+ }
+
+ return null;
+ }
} else {
return null;
}
diff --git
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIPRecord.java
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIPRecord.java
index c7aebb00d1..20a39098b9 100644
---
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIPRecord.java
+++
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIPRecord.java
@@ -55,8 +55,10 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestGeoEnrichIPRecord {
+
private TestRunner runner;
private DatabaseReader reader;
+
@BeforeEach
public void setup() throws Exception {
reader = mock(DatabaseReader.class);
@@ -96,6 +98,7 @@ public class TestGeoEnrichIPRecord {
runner.setProperty(GeoEnrichIPRecord.GEO_POSTAL_CODE,
"/geo/country_postal");
runner.setProperty(GeoEnrichIPRecord.GEO_LATITUDE, "/geo/lat");
runner.setProperty(GeoEnrichIPRecord.GEO_LONGITUDE, "/geo/lon");
+ runner.setProperty(AbstractEnrichIP.LOG_LEVEL, "WARN");
runner.assertValid();
}
@@ -129,7 +132,7 @@ public class TestGeoEnrichIPRecord {
byte[] raw = runner.getContentAsByteArray(ff);
String content = new String(raw);
ObjectMapper mapper = new ObjectMapper();
- List<Map<String, Object>> result = (List<Map<String,
Object>>)mapper.readValue(content, List.class);
+ List<Map<String, Object>> result = mapper.readValue(content,
List.class);
assertNotNull(result);
assertEquals(1, result.size());
@@ -152,9 +155,11 @@ public class TestGeoEnrichIPRecord {
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.unmodifiableList(Arrays.asList(
- READER, WRITER, IP_RECORD_PATH, SPLIT_FOUND_NOT_FOUND,
GEO_CITY, GEO_LATITUDE, GEO_LONGITUDE, GEO_COUNTRY, GEO_COUNTRY_ISO,
GEO_POSTAL_CODE
+ READER, WRITER, IP_RECORD_PATH, SPLIT_FOUND_NOT_FOUND,
GEO_CITY, GEO_LATITUDE, GEO_LONGITUDE,
+ GEO_COUNTRY, GEO_COUNTRY_ISO, GEO_POSTAL_CODE, LOG_LEVEL
));
}
+ @Override
@OnScheduled
public void onScheduled(ProcessContext context) {
databaseReaderRef.set(reader);
@@ -162,6 +167,7 @@ public class TestGeoEnrichIPRecord {
writerFactory =
context.getProperty(WRITER).asControllerService(RecordSetWriterFactory.class);
splitOutput =
context.getProperty(SPLIT_FOUND_NOT_FOUND).asBoolean();
}
+ @Override
protected void loadDatabaseFile() {
// Do nothing, the mock database reader is used
}