This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new d96398f NIFI-9200: Free cache on heap after disabling
AbstractCSVLookupService
d96398f is described below
commit d96398feb89a77a27370c47e7e52d7f90721193e
Author: Lehel Boér <[email protected]>
AuthorDate: Wed Sep 8 12:33:52 2021 +0200
NIFI-9200: Free cache on heap after disabling AbstractCSVLookupService
This closes #5372.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../apache/nifi/lookup/CSVRecordLookupService.java | 13 +++++++-
.../nifi/lookup/SimpleCsvFileLookupService.java | 39 +++++++++++++---------
.../nifi/lookup/TestCSVRecordLookupService.java | 38 +++++++++++++++------
.../lookup/TestSimpleCsvFileLookupService.java | 37 +++++++++++++++-----
4 files changed, 91 insertions(+), 36 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java
index 388ae68..d640fe9 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java
@@ -21,6 +21,7 @@ import org.apache.commons.csv.CSVRecord;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
@@ -138,7 +139,7 @@ public class CSVRecordLookupService extends
AbstractCSVLookupService implements
return Optional.empty();
}
- final String key = (String)coordinates.get(KEY);
+ final String key = (String) coordinates.get(KEY);
if (StringUtils.isBlank(key)) {
return Optional.empty();
}
@@ -159,4 +160,14 @@ public class CSVRecordLookupService extends
AbstractCSVLookupService implements
return REQUIRED_KEYS;
}
+ @OnDisabled
+ public void onDisabled() {
+ cache = null;
+ }
+
+ // VisibleForTesting
+ boolean isCaching() {
+ return cache != null;
+ }
+
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleCsvFileLookupService.java
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleCsvFileLookupService.java
index 3a9ace3..76185f5 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleCsvFileLookupService.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleCsvFileLookupService.java
@@ -16,6 +16,20 @@
*/
package org.apache.nifi.lookup;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -30,20 +44,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import org.apache.commons.csv.CSVRecord;
-import org.apache.commons.lang3.StringUtils;
-
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.controller.ControllerServiceInitializationContext;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.reporting.InitializationException;
-
@Tags({"lookup", "cache", "enrich", "join", "csv", "reloadable", "key",
"value"})
@CapabilityDescription("A reloadable CSV file-based lookup service. The first
line of the csv file is considered as " +
"header.")
@@ -111,7 +111,7 @@ public class SimpleCsvFileLookupService extends
AbstractCSVLookupService impleme
}
@OnEnabled
- public void onEnabled(final ConfigurationContext context) throws
IOException, InitializationException {
+ public void onEnabled(final ConfigurationContext context) throws
IOException, InitializationException {
super.onEnabled(context);
this.lookupValueColumn =
context.getProperty(LOOKUP_VALUE_COLUMN).evaluateAttributeExpressions().getValue();
try {
@@ -148,4 +148,13 @@ public class SimpleCsvFileLookupService extends
AbstractCSVLookupService impleme
return REQUIRED_KEYS;
}
+ @OnDisabled
+ public void onDisabled() {
+ cache = null;
+ }
+
+ // VisibleForTesting
+ boolean isCaching() {
+ return cache != null;
+ }
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestCSVRecordLookupService.java
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestCSVRecordLookupService.java
index 4ef84c6..b95bf07 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestCSVRecordLookupService.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestCSVRecordLookupService.java
@@ -16,34 +16,34 @@
*/
package org.apache.nifi.lookup;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Optional;
-
import org.apache.nifi.csv.CSVUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-
import org.junit.Test;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
public class TestCSVRecordLookupService {
private final static Optional<Record> EMPTY_RECORD = Optional.empty();
@Test
- public void testSimpleCsvFileLookupService() throws
InitializationException, IOException, LookupFailureException {
+ public void testSimpleCsvRecordLookupService() throws
InitializationException, IOException, LookupFailureException {
final TestRunner runner =
TestRunners.newTestRunner(TestProcessor.class);
final CSVRecordLookupService service = new CSVRecordLookupService();
- runner.addControllerService("csv-file-lookup-service", service);
+ runner.addControllerService("csv-record-lookup-service", service);
runner.setProperty(service, CSVRecordLookupService.CSV_FILE,
"src/test/resources/test.csv");
runner.setProperty(service, CSVRecordLookupService.CSV_FORMAT,
"RFC4180");
runner.setProperty(service, CSVRecordLookupService.LOOKUP_KEY_COLUMN,
"key");
@@ -53,7 +53,7 @@ public class TestCSVRecordLookupService {
final CSVRecordLookupService lookupService =
(CSVRecordLookupService) runner.getProcessContext()
.getControllerServiceLookup()
- .getControllerService("csv-file-lookup-service");
+ .getControllerService("csv-record-lookup-service");
assertThat(lookupService, instanceOf(LookupService.class));
@@ -70,11 +70,11 @@ public class TestCSVRecordLookupService {
}
@Test
- public void testSimpleCsvFileLookupServiceWithCharset() throws
InitializationException, IOException, LookupFailureException {
+ public void testSimpleCsvRecordLookupServiceWithCharset() throws
InitializationException, LookupFailureException {
final TestRunner runner =
TestRunners.newTestRunner(TestProcessor.class);
final CSVRecordLookupService service = new CSVRecordLookupService();
- runner.addControllerService("csv-file-lookup-service", service);
+ runner.addControllerService("csv-record-lookup-service", service);
runner.setProperty(service, CSVRecordLookupService.CSV_FILE,
"src/test/resources/test_Windows-31J.csv");
runner.setProperty(service, CSVRecordLookupService.CSV_FORMAT,
"RFC4180");
runner.setProperty(service, CSVRecordLookupService.CHARSET,
"Windows-31J");
@@ -111,5 +111,21 @@ public class TestCSVRecordLookupService {
assertEquals("my_value with an escaped |.",
my_key.get().getAsString("value"));
}
+ @Test
+ public void testCacheIsClearedWhenDisableService() throws
InitializationException {
+ final TestRunner runner =
TestRunners.newTestRunner(TestProcessor.class);
+ final CSVRecordLookupService service = new CSVRecordLookupService();
+ runner.addControllerService("csv-record-lookup-service", service);
+ runner.setProperty(service, CSVRecordLookupService.CSV_FILE,
"src/test/resources/test.csv");
+ runner.setProperty(service, CSVRecordLookupService.CSV_FORMAT,
"RFC4180");
+ runner.setProperty(service, CSVRecordLookupService.LOOKUP_KEY_COLUMN,
"key");
+ runner.enableControllerService(service);
+ runner.assertValid(service);
+
+ assertTrue(service.isCaching());
+
+ runner.disableControllerService(service);
+ assertFalse(service.isCaching());
+ }
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestSimpleCsvFileLookupService.java
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestSimpleCsvFileLookupService.java
index a7a90fe..18bc2a3 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestSimpleCsvFileLookupService.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestSimpleCsvFileLookupService.java
@@ -16,28 +16,29 @@
*/
package org.apache.nifi.lookup;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Optional;
-
import org.apache.nifi.csv.CSVUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-
import org.junit.Test;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
public class TestSimpleCsvFileLookupService {
final static Optional<String> EMPTY_STRING = Optional.empty();
@Test
- public void testSimpleCsvFileLookupService() throws
InitializationException, IOException, LookupFailureException {
+ public void testSimpleCsvFileLookupService() throws
InitializationException, LookupFailureException {
final TestRunner runner =
TestRunners.newTestRunner(TestProcessor.class);
final SimpleCsvFileLookupService service = new
SimpleCsvFileLookupService();
@@ -50,9 +51,9 @@ public class TestSimpleCsvFileLookupService {
runner.assertValid(service);
final SimpleCsvFileLookupService lookupService =
- (SimpleCsvFileLookupService) runner.getProcessContext()
- .getControllerServiceLookup()
- .getControllerService("csv-file-lookup-service");
+ (SimpleCsvFileLookupService) runner.getProcessContext()
+ .getControllerServiceLookup()
+ .getControllerService("csv-file-lookup-service");
assertThat(lookupService, instanceOf(LookupService.class));
@@ -106,4 +107,22 @@ public class TestSimpleCsvFileLookupService {
final Optional<String> value =
service.lookup(Collections.singletonMap("key", "my_key"));
assertEquals(Optional.of("my_value with an escaped |."), value);
}
+
+ @Test
+ public void testCacheIsClearedWhenDisableService() throws
InitializationException {
+ final TestRunner runner =
TestRunners.newTestRunner(TestProcessor.class);
+ final CSVRecordLookupService service = new CSVRecordLookupService();
+ runner.addControllerService("csv-file-lookup-service", service);
+ runner.setProperty(service, CSVRecordLookupService.CSV_FILE,
"src/test/resources/test.csv");
+ runner.setProperty(service, CSVRecordLookupService.CSV_FORMAT,
"RFC4180");
+ runner.setProperty(service, CSVRecordLookupService.LOOKUP_KEY_COLUMN,
"key");
+ runner.enableControllerService(service);
+ runner.assertValid(service);
+
+ assertTrue(service.isCaching());
+
+ runner.disableControllerService(service);
+
+ assertFalse(service.isCaching());
+ }
}