This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4156cc9f1e feat(integration/nifi): Various improvements for Nifi 
integration
4156cc9f1e is described below

commit 4156cc9f1e65485ad7d21d3630835603727a5104
Author: Unai LerĂ­a Fortea <[email protected]>
AuthorDate: Mon Oct 2 09:01:11 2023 +0200

    feat(integration/nifi): Various improvements for Nifi integration
    
    * Add tag validation
    
    * Major changes
    
    * Rewrite processor to make them more consistent
    * Split processor in smaller funcions
    * Add timestamp field name
    * Allow expression language in connection string
    
    * Update readme
    
    * Add file address access strategy
    
    * Set a better exception attribute name in case of failure
    
    * Update readme
    
    * Move duplicated code to base processor
    
    * Add testing for tag validation on properties and address text
    
    * Add testing for file address access validation
    
    * Clean up
    
    * Add file address strategy test for all processors
---
 plc4j/integrations/apache-nifi/README.md           |  27 ++-
 .../org/apache/plc4x/nifi/BasePlc4xProcessor.java  | 250 +++++++++++++++++----
 .../plc4x/nifi/Plc4xListenRecordProcessor.java     | 157 ++++++-------
 .../org/apache/plc4x/nifi/Plc4xSinkProcessor.java  |  95 +++-----
 .../plc4x/nifi/Plc4xSinkRecordProcessor.java       | 161 ++++++-------
 .../apache/plc4x/nifi/Plc4xSourceProcessor.java    | 105 ++++-----
 .../plc4x/nifi/Plc4xSourceRecordProcessor.java     | 167 +++++++-------
 .../nifi/address/AddressesAccessStrategy.java      |  21 ++
 .../plc4x/nifi/address/AddressesAccessUtils.java   | 100 ++++++---
 .../plc4x/nifi/address/BaseAccessStrategy.java     | 124 ++++++++++
 .../address/DynamicPropertyAccessStrategy.java     |  35 ++-
 .../nifi/address/FilePropertyAccessStrategy.java   |  79 +++++++
 .../nifi/address/TextPropertyAccessStrategy.java   |  40 +++-
 .../nifi/record/Plc4xReadResponseRecordSet.java    |  22 +-
 .../org/apache/plc4x/nifi/record/Plc4xWriter.java  |   4 +-
 .../plc4x/nifi/record/RecordPlc4xWriter.java       |  22 +-
 .../org/apache/plc4x/nifi/record/SchemaCache.java  |   4 +-
 .../nifi/subscription/Plc4xListenerDispatcher.java |   3 +-
 .../org/apache/plc4x/nifi/util/Plc4xCommon.java    |   7 +-
 .../apache/plc4x/nifi/Plc4xSinkProcessorTest.java  |  19 +-
 .../plc4x/nifi/Plc4xSinkRecordProcessorTest.java   |  16 ++
 .../plc4x/nifi/Plc4xSourceProcessorTest.java       |  22 +-
 .../plc4x/nifi/Plc4xSourceRecordProcessorTest.java |  18 +-
 .../plc4x/nifi/address/AccessStrategyTest.java     | 190 ++++++++++++++++
 24 files changed, 1196 insertions(+), 492 deletions(-)

diff --git a/plc4j/integrations/apache-nifi/README.md 
b/plc4j/integrations/apache-nifi/README.md
index a80e6f8ed4..04aeed8654 100644
--- a/plc4j/integrations/apache-nifi/README.md
+++ b/plc4j/integrations/apache-nifi/README.md
@@ -20,8 +20,9 @@ under the License.
 
 ## Common properties
 The following properties applies to all Plc4x Processors:
-* Connection String: A constant connection string such as 
`s7://10.105.143.7:102?remote-rack=0&remote-slot=1&controller-type=S7_1200`.
-* Read/Write/Subscribe timeout (miliseconds): Specifies the time in 
milliseconds for the connection to return a timeout. In case of subscription 
the timeout is used to renew connections.
+* Connection String: A constant connection string such as 
`s7://10.105.143.7:102?remote-rack=0&remote-slot=1&controller-type=S7_1200` or 
a valid Expression Language ([Expression Language NiFi 
documentation](https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html))
 such as `${plc4x.connection_string}`.
+* Timeout (miliseconds): Specifies the time in milliseconds for the connection 
to return a timeout. Is used to renew connections. Can be set with Expression 
Language.
+* Timestamp field name: It defines the name of the field that represents the 
time when the response from the Plc was received. It will be added to the 
attributes or to the record deppending on the processor used.
 * Address Access Strategy: defines how the processor obtains the PLC 
addresses. It can take 2 values:
   * **Properties as Addreses:** 
       For each variable, add a new property to the processor where the 
property name matches the variable name, and the variable value corresponds to 
the address tag. 
@@ -48,6 +49,21 @@ The following properties applies to all Plc4x Processors:
     }
     ```
     If this JSON is in an attribute `plc4x.addresses` it can be accessed with 
*Address Text*=`${plc4x.addresses}`. 
+  
+  * **Address File:**
+    Property *Address File* must be supplied with a path to a file in JSON 
format that contains variable name and address tag. Expression Language is 
supported.
+
+    For example a file in:
+    - *Address File*:```/home/nifi/s7addresses.json```  
+    With the following content
+    ```json
+    {
+      "var1" : "%DB1:DBX0.0:BOOL",
+      "var2" : "%DB1:DBX0.1:BOOL"
+    }
+    ```
+    If the file name is in an attribute `plc4x.addresses_file` it can be 
accessed with *Address File*=`${plc4x.addresses_file}`. 
+
 
 
 When reading from a PLC the response is used to create a mapping between Plc 
types into Avro. The mapping is done as follows:
@@ -87,10 +103,6 @@ Table of data mapping between plc data and Avro types (as 
specified in [Avro spe
 Also, it is important to keep in mind the Processor Scheduling Configuration. 
Using the parameter **Run Schedule** (for example to *1 sec*), the reading 
frequency can be set. Note that by default, this value is defined to 0 sec (as 
fast as possible).
 
 
-## Plc4xSinkProcessor
-
-## Plc4xSourceProcessor
-
 ## Plc4xSinkRecordProcessor
 
 This processor is <ins>record oriented</ins>, reads from a formated input 
flowfile content using a Record Reader (for further information see [NiFi 
Documentation](https://nifi.apache.org/docs/nifi-docs/html/record-path-guide.html#overview)).
 
@@ -124,6 +136,7 @@ An *example* for reading values from a S7-1200:
 - *PLC connection String:* 
*s7://10.105.143.7:102?remote-rack=0&remote-slot=1&controller-type=S7_1200*
 - *Record Writer:* *PLC4x Embedded - AvroRecordSetWriter*
 - *Read timeout (miliseconds):* *10000*
+- *Timestamp field name:* *timestamp*  
 - *var1:* *%DB1:DBX0.0:BOOL*
 - *var2:* *%DB1:DBX0.1:BOOL*
 - *var3:* *%DB1:DBB01:BYTE*
@@ -162,6 +175,6 @@ The output flowfile will contain the PLC read values. This 
information is includ
   "var3" : "\u0005",
   "var5" : 1992,
   "var4" : "4",
-  "ts" : 1628783058433
+  "timestamp" : 1628783058433
 } ]
 ```
\ No newline at end of file
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
index a8da46c46f..b2b05635d8 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
@@ -18,36 +18,49 @@
  */
 package org.apache.plc4x.nifi;
 
+import java.io.OutputStream;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.plc4x.java.api.PlcConnectionManager;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.plc4x.java.DefaultPlcDriverManager;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.PlcDriver;
-import org.apache.plc4x.java.api.PlcDriverManager;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
+import org.apache.plc4x.java.api.model.PlcTag;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager;
 import org.apache.plc4x.nifi.address.AddressesAccessStrategy;
 import org.apache.plc4x.nifi.address.AddressesAccessUtils;
+import org.apache.plc4x.nifi.address.DynamicPropertyAccessStrategy;
+import org.apache.plc4x.nifi.record.Plc4xWriter;
 import org.apache.plc4x.nifi.record.SchemaCache;
 
 public abstract class BasePlc4xProcessor extends AbstractProcessor {
@@ -55,28 +68,31 @@ public abstract class BasePlc4xProcessor extends 
AbstractProcessor {
     protected List<PropertyDescriptor> properties;
     protected Set<Relationship> relationships;
     protected volatile boolean debugEnabled;
-  
-    protected String connectionString;
-    protected Map<String, String> addressMap;
-    protected Long timeout;
+    protected Integer cacheSize = 0;
 
     protected final SchemaCache schemaCache = new SchemaCache(0);
+    protected AddressesAccessStrategy addressAccessStrategy;
 
-    private final PlcConnectionManager connectionManager = 
CachedPlcConnectionManager.getBuilder()
-        .withMaxLeaseTime(Duration.ofSeconds(1000L))
-        .withMaxWaitTime(Duration.ofSeconds(500L))
-        .build();
+    private CachedPlcConnectionManager connectionManager;
 
-    protected static final List<AllowableValue> addressAccessStrategy = 
Collections.unmodifiableList(Arrays.asList(
-        AddressesAccessUtils.ADDRESS_PROPERTY,
-        AddressesAccessUtils.ADDRESS_TEXT));
+    protected CachedPlcConnectionManager getConnectionManager() {
+        return connectionManager;
+    }
+
+    protected void refreshConnectionManager() {
+        connectionManager = CachedPlcConnectionManager.getBuilder()
+            .withMaxLeaseTime(Duration.ofSeconds(1000L))
+            .withMaxWaitTime(Duration.ofSeconds(500L))
+            .build();
+    }
 
 
-       protected static final PropertyDescriptor PLC_CONNECTION_STRING = new 
PropertyDescriptor.Builder()
+       public static final PropertyDescriptor PLC_CONNECTION_STRING = new 
PropertyDescriptor.Builder()
         .name("plc4x-connection-string")
         .displayName("PLC connection String")
         .description("PLC4X connection string used to connect to a given PLC 
device.")
         .required(true)
+        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
         .addValidator(new Plc4xConnectionStringValidator())
         .build();
        
@@ -86,6 +102,7 @@ public abstract class BasePlc4xProcessor extends 
AbstractProcessor {
                .description("Maximum number of entries in the cache. Can 
improve performance when addresses change dynamically.")
                .defaultValue("1")
                .required(true)
+        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
                .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
                .build();
 
@@ -95,9 +112,20 @@ public abstract class BasePlc4xProcessor extends 
AbstractProcessor {
                .description( "Request timeout in miliseconds")
                .defaultValue("10000")
                .required(true)
+        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
                .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
                .build();
 
+    public static final PropertyDescriptor PLC_TIMESTAMP_FIELD_NAME = new 
PropertyDescriptor.Builder()
+        .name("plc4x-timestamp-field-name")
+        .displayName("Timestamp Field Name")
+        .description("Name of the field that will display the timestamp of the 
operation.")
+        .required(true)
+        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+        .addValidator(new Plc4xTimestampFieldValidator())
+        .defaultValue("ts")
+        .build();
+
 
     protected static final Relationship REL_SUCCESS = new 
Relationship.Builder()
            .name("success")
@@ -117,8 +145,10 @@ public abstract class BasePlc4xProcessor extends 
AbstractProcessor {
        properties.add(PLC_CONNECTION_STRING);
         properties.add(AddressesAccessUtils.PLC_ADDRESS_ACCESS_STRATEGY);
         properties.add(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY);
+        properties.add(AddressesAccessUtils.ADDRESS_FILE_PROPERTY);
         properties.add(PLC_SCHEMA_CACHE_SIZE);
         properties.add(PLC_FUTURE_TIMEOUT_MILISECONDS);
+        properties.add(PLC_TIMESTAMP_FIELD_NAME);
         this.properties = Collections.unmodifiableList(properties);
 
        
@@ -129,12 +159,19 @@ public abstract class BasePlc4xProcessor extends 
AbstractProcessor {
     }
 
     public Map<String, String> getPlcAddressMap(ProcessContext context, 
FlowFile flowFile) {
-        AddressesAccessStrategy strategy = 
AddressesAccessUtils.getAccessStrategy(context);
-        return strategy.extractAddresses(context, flowFile);
+        return addressAccessStrategy.extractAddresses(context, flowFile);
     }
     
-    public String getConnectionString() {
-        return connectionString;
+    public String getConnectionString(ProcessContext context, FlowFile 
flowFile) {
+        return 
context.getProperty(PLC_CONNECTION_STRING).evaluateAttributeExpressions(flowFile).getValue();
+    }
+
+    public Long getTimeout(ProcessContext context, FlowFile flowFile) {
+        return 
context.getProperty(PLC_FUTURE_TIMEOUT_MILISECONDS).evaluateAttributeExpressions(flowFile).asLong();
+    }
+
+    public String getTimestampField(ProcessContext context) {
+        return 
context.getProperty(PLC_TIMESTAMP_FIELD_NAME).evaluateAttributeExpressions().getValue();
     }
 
     public SchemaCache getSchemaCache() {
@@ -151,14 +188,14 @@ public abstract class BasePlc4xProcessor extends 
AbstractProcessor {
         return properties;
     }
     
-    //dynamic prop
     @Override
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
         return new PropertyDescriptor.Builder()
                 .name(propertyDescriptorName)
-                .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+                
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
                 
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
                 .dependsOn(AddressesAccessUtils.PLC_ADDRESS_ACCESS_STRATEGY, 
AddressesAccessUtils.ADDRESS_PROPERTY)
+                .addValidator(new 
DynamicPropertyAccessStrategy.TagValidator(AddressesAccessUtils.getManager()))
                 .required(false)
                 .dynamic(true)
                 .build();
@@ -167,10 +204,14 @@ public abstract class BasePlc4xProcessor extends 
AbstractProcessor {
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
-               connectionString = 
context.getProperty(PLC_CONNECTION_STRING.getName()).getValue();
-        
schemaCache.restartCache(context.getProperty(PLC_SCHEMA_CACHE_SIZE).asInteger());
+        Integer newCacheSize = 
context.getProperty(PLC_SCHEMA_CACHE_SIZE).evaluateAttributeExpressions().asInteger();
+        if (!newCacheSize.equals(cacheSize)){
+            schemaCache.restartCache(newCacheSize);
+            cacheSize = newCacheSize;
+        }
+        refreshConnectionManager();
         debugEnabled = getLogger().isDebugEnabled();
-        timeout = 
context.getProperty(PLC_FUTURE_TIMEOUT_MILISECONDS.getName()).asLong();
+        addressAccessStrategy = 
AddressesAccessUtils.getAccessStrategy(context);
     }
 
     @Override
@@ -186,37 +227,162 @@ public abstract class BasePlc4xProcessor extends 
AbstractProcessor {
         }
         BasePlc4xProcessor that = (BasePlc4xProcessor) o;
         return Objects.equals(properties, that.properties) &&
-            Objects.equals(getRelationships(), that.getRelationships()) &&
-            Objects.equals(getConnectionString(), that.getConnectionString());
+            Objects.equals(getRelationships(), that.getRelationships());
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), properties, getRelationships(), 
getConnectionString());
+        return Objects.hash(super.hashCode(), properties, getRelationships());
     }
 
-    public static class Plc4xConnectionStringValidator implements Validator {
-        @Override
-        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
-            // TODO: Add validation here ...
-            return new 
ValidationResult.Builder().subject(subject).explanation("").valid(true).build();
+    protected PlcWriteRequest getWriteRequest(final ComponentLog logger,
+            final Map<String, String> addressMap, final Map<String, PlcTag> 
tags, final Map<String, ? extends Object> presentTags,
+            final PlcConnection connection, final AtomicLong nrOfRowsHere) {
+
+        PlcWriteRequest.Builder builder = connection.writeRequestBuilder();
+
+        if (tags != null){
+            for (Map.Entry<String,PlcTag> tag : tags.entrySet()){
+                if (presentTags.containsKey(tag.getKey())) {
+                    builder.addTag(tag.getKey(), tag.getValue(), 
presentTags.get(tag.getKey()));
+                    if (nrOfRowsHere != null) {
+                        nrOfRowsHere.incrementAndGet();
+                    }
+                } else {
+                    if (debugEnabled)
+                        logger.debug("PlcTag " + tag + " is declared as 
address but was not found on input record.");
+                }
+            }
+        } else {
+            if (debugEnabled)
+                logger.debug("PlcTypes resolution not found in cache and will 
be added with key: " + addressMap);
+            for (Map.Entry<String,String> entry: addressMap.entrySet()){
+                if (presentTags.containsKey(entry.getKey())) {
+                    builder.addTagAddress(entry.getKey(), entry.getValue(), 
presentTags.get(entry.getKey()));
+                    if (nrOfRowsHere != null) {
+                        nrOfRowsHere.incrementAndGet();
+                    }
+                }
+            }
+        }
+         
+        return builder.build();
+    }
+
+    protected PlcReadRequest getReadRequest(final ComponentLog logger, 
+            final Map<String, String> addressMap, final Map<String, PlcTag> 
tags,
+            final PlcConnection connection) {
+
+        PlcReadRequest.Builder builder = connection.readRequestBuilder();
+
+        if (tags != null){
+            for (Map.Entry<String,PlcTag> tag : tags.entrySet()){
+                builder.addTag(tag.getKey(), tag.getValue());
+            }
+        } else {
+            if (debugEnabled)
+                logger.debug("Plc-Avro schema and PlcTypes resolution not 
found in cache and will be added with key: " + addressMap);
+            for (Map.Entry<String,String> entry: addressMap.entrySet()){
+                builder.addTagAddress(entry.getKey(), entry.getValue());
+            }
+        }
+        return builder.build();
+       }
+
+    protected void evaluateWriteResponse(final ComponentLog logger, 
Map<String, ? extends Object> values, PlcWriteResponse plcWriteResponse) {
+
+               boolean codeErrorPresent = false;
+               List<String> tagsAtError = null;
+
+               PlcResponseCode code = null;
+
+               for (String tag : plcWriteResponse.getTagNames()) {
+                       code = plcWriteResponse.getResponseCode(tag);
+                       if (!code.equals(PlcResponseCode.OK)) {
+                               if (tagsAtError == null) {
+                                       tagsAtError = new ArrayList<>();
+                               }
+                               logger.error("Not OK code when writing the data 
to PLC for tag " + tag 
+                                       + " with value  " + 
values.get(tag).toString() 
+                                       + " in addresss " + 
plcWriteResponse.getTag(tag).getAddressString());
+                               
+                       codeErrorPresent = true;
+                       tagsAtError.add(tag);
+                                               
+                       }
+               }
+               if (codeErrorPresent) {
+                       throw new ProcessException("At least one error was 
found when while writting tags: " + tagsAtError.toString());
+               }
+       }
+
+   protected void evaluateReadResponse(final ProcessSession session, final 
FlowFile flowFile, final PlcReadResponse response) {
+        Map<String, String> attributes = new HashMap<>();
+        for (String tagName : response.getTagNames()) {
+            for (int i = 0; i < response.getNumberOfValues(tagName); i++) {
+                Object value = response.getObject(tagName, i);
+                attributes.put(tagName, String.valueOf(value));
+            }
         }
+        session.putAllAttributes(flowFile, attributes);
     }
 
-    public static class Plc4xAddressStringValidator implements Validator {
+    protected long evaluateReadResponse(final ProcessContext context, final 
ComponentLog logger, final FlowFile originalFlowFile,
+                       Plc4xWriter plc4xWriter, OutputStream out, final 
RecordSchema recordSchema, PlcReadResponse readResponse)
+                       throws Exception {
+
+               if(originalFlowFile == null) //there is no inherit attributes 
to use in writer service 
+                       return plc4xWriter.writePlcReadResponse(readResponse, 
out, logger, null, recordSchema, getTimestampField(context));
+               else 
+                       return plc4xWriter.writePlcReadResponse(readResponse, 
out, logger, null, recordSchema, originalFlowFile, getTimestampField(context));
+       }
+
+    protected static class Plc4xConnectionStringValidator implements Validator 
{
         @Override
         public ValidationResult validate(String subject, String input, 
ValidationContext context) {
-            // TODO: Add validation here ...
-            return new 
ValidationResult.Builder().subject(subject).explanation("").valid(true).build();
+            DefaultPlcDriverManager manager = new DefaultPlcDriverManager();
+            
+            if (context.isExpressionLanguageSupported(subject) && 
context.isExpressionLanguagePresent(input)) {
+                return new 
ValidationResult.Builder().subject(subject).input(input).explanation("Expression
 Language Present").valid(true).build();
+            }
+            try {
+                PlcDriver driver =  manager.getDriverForUrl(input);
+                driver.getConnection(input);
+            } catch (PlcConnectionException e) {
+                return new ValidationResult.Builder().subject(subject)
+                    .explanation(e.getMessage())
+                    .valid(false)
+                    .build();
+            }
+            return new ValidationResult.Builder().subject(subject)
+                .explanation("")
+                .valid(true)
+                .build();
         }
     }
 
-    protected PlcConnectionManager getConnectionManager() {
-        return connectionManager;
-    }
+    protected static class Plc4xTimestampFieldValidator implements Validator {
+        @Override
+        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
 
-    protected PlcDriver getDriver() throws PlcConnectionException {
-        return PlcDriverManager.getDefault().getDriverForUrl(connectionString);
-    }
+            if (context.isExpressionLanguageSupported(subject) && 
context.isExpressionLanguagePresent(input)) {
+                return new 
ValidationResult.Builder().subject(subject).input(input).explanation("Expression
 Language Present").valid(true).build();
+            }
+            
+            Map<String, String> allProperties = context.getAllProperties();
+            allProperties.remove(subject);
+
+            if (allProperties.containsValue(input)) {
+                return new ValidationResult.Builder().subject(subject)
+                    .explanation("Timestamp field must be unique")
+                    .valid(false)
+                    .build(); 
+            }
+            return new ValidationResult.Builder().subject(subject)
+                .explanation("")
+                .valid(true)
+                .build();
 
+        }
+    }
 }
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xListenRecordProcessor.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xListenRecordProcessor.java
index 0664fa0015..4b6e47b719 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xListenRecordProcessor.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xListenRecordProcessor.java
@@ -30,7 +30,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -85,6 +84,7 @@ public class Plc4xListenRecordProcessor extends 
BasePlc4xProcessor {
        protected Plc4xListenerDispatcher dispatcher;
        protected RecordSchema recordSchema;
        protected Thread readerThread;
+       protected Map<String, String> addressMap;
        final StopWatch executeTime = new StopWatch(false);
 
        public static final PropertyDescriptor PLC_RECORD_WRITER_FACTORY = new 
PropertyDescriptor.Builder()
@@ -112,21 +112,7 @@ public class Plc4xListenRecordProcessor extends 
BasePlc4xProcessor {
                .dependsOn(PLC_SUBSCRIPTION_TYPE, 
Plc4xSubscriptionType.CYCLIC.name())
                .required(true)
         .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-               .addValidator(new Validator() {
-                       @Override
-                       public ValidationResult validate(String subject, String 
input, ValidationContext context) {
-                               if 
(context.getProperty(PLC_FUTURE_TIMEOUT_MILISECONDS).asLong() > 
Long.valueOf(input)) {
-                                       return new 
ValidationResult.Builder().valid(true).build();
-                               } else {
-                                       return new ValidationResult.Builder()
-                                       .valid(false)
-                                       .input(input)
-                                       
.subject(PLC_SUBSCRIPTION_CYCLIC_POLLING_INTERVAL.getDisplayName())
-                                       .explanation(String.format("it must me 
smaller than the value of %s", PLC_FUTURE_TIMEOUT_MILISECONDS.getDisplayName()))
-                                       .build();
-                               }
-                       }       
-               })
+               .addValidator(new CyclycPollingIntervalValidator())
         .defaultValue("10000")
                .build();
 
@@ -151,20 +137,19 @@ public class Plc4xListenRecordProcessor extends 
BasePlc4xProcessor {
                super.onScheduled(context);
                subscriptionType = 
Plc4xSubscriptionType.valueOf(context.getProperty(PLC_SUBSCRIPTION_TYPE).getValue());
         cyclingPollingInterval = 
context.getProperty(PLC_SUBSCRIPTION_CYCLIC_POLLING_INTERVAL).asLong();
-               addressMap = getPlcAddressMap(context, null);
-
-               createDispatcher(events);
+               createDispatcher(context, events);
        }
 
-    protected void createDispatcher(final BlockingQueue<PlcSubscriptionEvent> 
events) {
+    protected void createDispatcher(final ProcessContext context, final 
BlockingQueue<PlcSubscriptionEvent> events) {
                if (readerThread != null) {
                        return;
                }
 
                // create the dispatcher and calls open() to start listening to 
the plc subscription
-        dispatcher =  new Plc4xListenerDispatcher(timeout, subscriptionType, 
cyclingPollingInterval, getLogger(), events);
+        dispatcher =  new Plc4xListenerDispatcher(getTimeout(context, null), 
subscriptionType, cyclingPollingInterval, getLogger(), events);
                try {
-                       dispatcher.open(getConnectionString(), addressMap);
+                       addressMap = getPlcAddressMap(context, null);
+                       dispatcher.open(getConnectionString(context, null), 
addressMap);
                } catch (Exception e) {
                        if (debugEnabled) {
                                getLogger().debug("Error creating a the 
subscription event dispatcher");
@@ -198,7 +183,7 @@ public class Plc4xListenRecordProcessor extends 
BasePlc4xProcessor {
                }
     }
 
-       protected PlcSubscriptionEvent getMessage() {
+       protected PlcSubscriptionEvent getMessage(final ProcessContext context) 
{
                if (readerThread != null && readerThread.isAlive()) {
                        return events.poll();
                        
@@ -208,14 +193,14 @@ public class Plc4xListenRecordProcessor extends 
BasePlc4xProcessor {
                        getLogger().debug("Connection to Plc broke. Trying to 
restart connection");
                }
                closeDispatcher();
-               createDispatcher(events);
+               createDispatcher(context, events);
                throw new ProcessException("Connection to Plc broke. Trying to 
restart connection");
        }
        
        @Override
        public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
 
-               DefaultPlcSubscriptionEvent event = 
(DefaultPlcSubscriptionEvent) getMessage();
+               DefaultPlcSubscriptionEvent event = 
(DefaultPlcSubscriptionEvent) getMessage(context);
 
                if (event == null) {
                        return;
@@ -226,70 +211,25 @@ public class Plc4xListenRecordProcessor extends 
BasePlc4xProcessor {
 
                final AtomicLong nrOfRows = new AtomicLong(0L);
 
-               FlowFile resultSetFF;
-               resultSetFF = session.create();
+               FlowFile resultSetFF = session.create();
 
                Plc4xWriter plc4xWriter = new 
RecordPlc4xWriter(context.getProperty(PLC_RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class),
 Collections.emptyMap());
 
                try {
-                       resultSetFF = session.write(resultSetFF, out -> {
+                       session.write(resultSetFF, out -> {
                                try {
-                                       
nrOfRows.set(plc4xWriter.writePlcReadResponse(event, out, getLogger(), null, 
recordSchema));
+                                       
nrOfRows.set(plc4xWriter.writePlcReadResponse(event, out, getLogger(), null, 
recordSchema, getTimestampField(context)));
                                }  catch (Exception e) {
                                        getLogger().error("Exception reading 
the data from PLC", e);
                                        throw (e instanceof ProcessException) ? 
(ProcessException) e : new ProcessException(e);
                                }
 
                                if (recordSchema == null){
-                                       if (debugEnabled)
-                                               getLogger().debug("Adding 
Plc-Avro schema and PlcTypes resolution into cache with key: " + 
addressMap.toString());
-                                       
-                                       // Add schema to the cache
-                                       LinkedHashSet<String> addressNames = 
new LinkedHashSet<String>();
-                                       
addressNames.addAll(event.getTagNames());
-                                       
-                                       List<PlcTag> addressTags = 
addressNames.stream().map(
-                                               new Function<String,PlcTag>() {
-                                                       @Override
-                                                       public PlcTag 
apply(String addr) {
-                                                               return new 
PlcTag() {
-                                                                       
@Override
-                                                                       public 
String getAddressString() {
-                                                                               
return addr;
-                                                                       }
-
-                                                                       
@Override
-                                                                       public 
PlcValueType getPlcValueType() {
-                                                                               
return event.getPlcValue(addr).getPlcValueType();
-                                                                       }
-                                                               };
-                                                       }
-                                               
}).collect(Collectors.toList()); 
-
-                                       getSchemaCache().addSchema(
-                                               addressMap, 
-                                               addressNames,
-                                               addressTags,
-                                               plc4xWriter.getRecordSchema()
-                                       );
-                                       recordSchema = 
getSchemaCache().retrieveSchema(addressMap);
+                                       addTagsToCache(event, plc4xWriter);
                                }
                        });
-                       long executionTimeElapsed = 
executeTime.getElapsed(TimeUnit.MILLISECONDS);
-                       executeTime.stop();
-                       
-                       final Map<String, String> attributesToAdd = new 
HashMap<>();
-                       attributesToAdd.put(RESULT_ROW_COUNT, 
String.valueOf(nrOfRows.get()));
-                       attributesToAdd.put(RESULT_LAST_EVENT, 
String.valueOf(executionTimeElapsed));
-
-                       
attributesToAdd.putAll(plc4xWriter.getAttributesToAdd());
-                       resultSetFF = session.putAllAttributes(resultSetFF, 
attributesToAdd);
-                       plc4xWriter.updateCounters(session);
-                       getLogger().info("{} contains {} records; transferring 
to 'success'", resultSetFF, nrOfRows.get());
-                       
-                       session.getProvenanceReporter().receive(resultSetFF, 
"Retrieved " + nrOfRows.get() + " rows from subscription", 
executionTimeElapsed);
+                       resultSetFF = completeResultFlowFile(session, nrOfRows, 
resultSetFF, plc4xWriter);
                        session.transfer(resultSetFF, REL_SUCCESS);
-                       session.commitAsync();
 
                        executeTime.start();
 
@@ -298,4 +238,71 @@ public class Plc4xListenRecordProcessor extends 
BasePlc4xProcessor {
                        throw new ProcessException("Got an error while trying 
to get a subscription event", e);
                }
        }
+
+       private void addTagsToCache(DefaultPlcSubscriptionEvent event, 
Plc4xWriter plc4xWriter) {
+               if (debugEnabled)
+                       getLogger().debug("Adding Plc-Avro schema and PlcTypes 
resolution into cache with key: " + addressMap.toString());
+               
+               // Add schema to the cache
+               LinkedHashSet<String> addressNames = new LinkedHashSet<>();
+               addressNames.addAll(event.getTagNames());
+               
+               List<PlcTag> addressTags = addressNames.stream().map(addr -> 
+                               new PlcTag() {
+                                       @Override
+                                       public String getAddressString() {
+                                               return addr;
+                                       }
+
+                                       @Override
+                                       public PlcValueType getPlcValueType() {
+                                               return 
event.getPlcValue(addr).getPlcValueType();
+                                       }
+                               }
+                       ).collect(Collectors.toList()); 
+
+               getSchemaCache().addSchema(
+                       addressMap, 
+                       addressNames,
+                       addressTags,
+                       plc4xWriter.getRecordSchema()
+               );
+               recordSchema = getSchemaCache().retrieveSchema(addressMap);
+       }
+
+       private FlowFile completeResultFlowFile(final ProcessSession session, 
final AtomicLong nrOfRows, FlowFile resultSetFF,
+                       Plc4xWriter plc4xWriter) {
+                               
+               long executionTimeElapsed = 
executeTime.getElapsed(TimeUnit.MILLISECONDS);
+               executeTime.stop();
+               
+               final Map<String, String> attributesToAdd = new HashMap<>();
+               attributesToAdd.put(RESULT_ROW_COUNT, 
String.valueOf(nrOfRows.get()));
+               attributesToAdd.put(RESULT_LAST_EVENT, 
String.valueOf(executionTimeElapsed));
+
+               attributesToAdd.putAll(plc4xWriter.getAttributesToAdd());
+               resultSetFF = session.putAllAttributes(resultSetFF, 
attributesToAdd);
+               plc4xWriter.updateCounters(session);
+               getLogger().info("{} contains {} records; transferring to 
'success'", resultSetFF, nrOfRows.get());
+               
+               session.getProvenanceReporter().receive(resultSetFF, "Retrieved 
" + nrOfRows.get() + " rows from subscription", executionTimeElapsed);
+               return resultSetFF;
+       }
+
+
+       protected static class CyclycPollingIntervalValidator implements 
Validator {
+               @Override
+               public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+                       if 
(context.getProperty(PLC_FUTURE_TIMEOUT_MILISECONDS).asLong() > 
Long.valueOf(input)) {
+                               return new 
ValidationResult.Builder().valid(true).build();
+                       } else {
+                               return new ValidationResult.Builder()
+                               .valid(false)
+                               .input(input)
+                               
.subject(PLC_SUBSCRIPTION_CYCLIC_POLLING_INTERVAL.getDisplayName())
+                               .explanation(String.format("it must me smaller 
than the value of %s", PLC_FUTURE_TIMEOUT_MILISECONDS.getDisplayName()))
+                               .build();
+                       }
+               }       
+       }
 }
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
index 9c1cb758e7..f18b35e893 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
@@ -20,6 +20,7 @@ package org.apache.plc4x.nifi;
 
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.ReadsAttribute;
@@ -36,7 +37,6 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.java.api.messages.PlcWriteResponse;
-import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.java.api.model.PlcTag;
 
 @TriggerSerially
@@ -47,83 +47,62 @@ import org.apache.plc4x.java.api.model.PlcTag;
 @ReadsAttributes({@ReadsAttribute(attribute="value", description="some 
value")})
 public class Plc4xSinkProcessor extends BasePlc4xProcessor {
 
+       public static final String EXCEPTION = "plc4x.write.exception";
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         FlowFile flowFile = session.get();
-        final ComponentLog logger = getLogger();
-
+        
         // Abort if there's nothing to do.
         if (flowFile == null) {
             return;
         }
 
-        // Get an instance of a component able to write to a PLC.
-        try(PlcConnection connection = 
getConnectionManager().getConnection(getConnectionString())) {
+        final ComponentLog logger = getLogger();
+
+        try(PlcConnection connection = 
getConnectionManager().getConnection(getConnectionString(context, flowFile))) {
             if (!connection.getMetadata().canWrite()) {
                 throw new ProcessException("Writing not supported by 
connection");
             }
-
-            // Prepare the request.
-            PlcWriteRequest.Builder builder = connection.writeRequestBuilder();
-            Map<String,String> addressMap = getPlcAddressMap(context, 
flowFile);
+            
+            final Map<String,String> addressMap = getPlcAddressMap(context, 
flowFile);
             final Map<String, PlcTag> tags = 
getSchemaCache().retrieveTags(addressMap);
 
-            if (tags != null){
-                for (Map.Entry<String,PlcTag> tag : tags.entrySet()){
-                    if (flowFile.getAttributes().containsKey(tag.getKey())) {
-                        builder.addTag(tag.getKey(), tag.getValue(), 
flowFile.getAttribute(tag.getKey()));
-                    } else {
-                        if (debugEnabled)
-                            logger.debug("PlcTag " + tag + " is declared as 
address but was not found on input record.");
-                    }
-                }
-            } else {
-                for (Map.Entry<String,String> entry: addressMap.entrySet()){
-                    if (flowFile.getAttributes().containsKey(entry.getKey())) {
-                        builder.addTagAddress(entry.getKey(), 
entry.getValue(), flowFile.getAttribute(entry.getKey()));
-                    }
-                }
-                if (debugEnabled)
-                    logger.debug("PlcTypes resolution not found in cache and 
will be added with key: " + addressMap);
-            }
-           
-            PlcWriteRequest writeRequest = builder.build();
+            PlcWriteRequest writeRequest = getWriteRequest(logger, addressMap, 
tags, flowFile.getAttributes(), connection, null);
 
-            // Send the request to the PLC.
             try {
-                final PlcWriteResponse plcWriteResponse = 
writeRequest.execute().get(this.timeout, TimeUnit.MILLISECONDS);
-                PlcResponseCode code = null;
-
-                for (String tag : plcWriteResponse.getTagNames()) {
-                    code = plcWriteResponse.getResponseCode(tag);
-                    if (!code.equals(PlcResponseCode.OK)) {
-                        logger.error("Not OK code when writing the data to PLC 
for tag " + tag 
-                                                               + " with value  
" + flowFile.getAttribute(tag)
-                                                               + " in addresss 
" + plcWriteResponse.getTag(tag).getAddressString());
-                        throw new Exception(code.toString());
-                    }
-                }
-                session.transfer(flowFile, REL_SUCCESS);
+                final PlcWriteResponse plcWriteResponse = 
writeRequest.execute().get(getTimeout(context, flowFile), 
TimeUnit.MILLISECONDS);
 
-                if (tags == null){
-                    if (debugEnabled)
-                        logger.debug("Adding PlcTypes resolution into cache 
with key: " + addressMap);
-                    getSchemaCache().addSchema(
-                        addressMap, 
-                        writeRequest.getTagNames(),
-                        writeRequest.getTags(),
-                        null
-                    );
-                }
+                evaluateWriteResponse(logger, flowFile.getAttributes(), 
plcWriteResponse);
+ 
+            } catch (TimeoutException e) {
+                logger.error("Timeout writting the data to the PLC", e);
+                
getConnectionManager().removeCachedConnection(getConnectionString(context, 
flowFile));
+                throw new ProcessException(e);
             } catch (Exception e) {
-                flowFile = session.putAttribute(flowFile, "exception", 
e.getLocalizedMessage());
-                session.transfer(flowFile, REL_FAILURE);
+                logger.error("Exception writting the data to the PLC", e);
+                throw (e instanceof ProcessException) ? (ProcessException) e : 
new ProcessException(e);
             }
 
-        } catch (ProcessException e) {
-            throw e;
+            session.transfer(flowFile, REL_SUCCESS);
+
+            if (tags == null){
+                if (debugEnabled)
+                    logger.debug("Adding PlcTypes resolution into cache with 
key: " + addressMap);
+                getSchemaCache().addSchema(
+                    addressMap, 
+                    writeRequest.getTagNames(),
+                    writeRequest.getTags(),
+                    null
+                );
+            }
+
+
         } catch (Exception e) {
-            throw new ProcessException("Got an error while trying to get a 
connection", e);
+            flowFile = session.putAttribute(flowFile, EXCEPTION, 
e.getLocalizedMessage());
+            session.transfer(flowFile, REL_FAILURE);
+            session.commitAsync();
+            throw (e instanceof ProcessException) ? (ProcessException) e : new 
ProcessException(e);
         }
     }
 
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessor.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessor.java
index 7f58803b45..661f919266 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessor.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessor.java
@@ -18,7 +18,6 @@
  */
 package org.apache.plc4x.nifi;
 
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -27,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -52,10 +52,10 @@ import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.util.StopWatch;
 import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.java.api.messages.PlcWriteResponse;
 import org.apache.plc4x.java.api.model.PlcTag;
-import org.apache.plc4x.java.api.types.PlcResponseCode;
 
 @TriggerSerially
 @Tags({"plc4x", "put", "sink", "record"})
@@ -73,6 +73,7 @@ public class Plc4xSinkRecordProcessor extends 
BasePlc4xProcessor {
        public static final String RESULT_ROW_COUNT = "plc4x.write.row.count";
        public static final String RESULT_QUERY_EXECUTION_TIME = 
"plc4x.write.query.executiontime";
        public static final String INPUT_FLOWFILE_UUID = "input.flowfile.uuid";
+       public static final String EXCEPTION = "plc4x.write.exception";
        
        public static final PropertyDescriptor PLC_RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
                        .name("record-reader").displayName("Record Reader")
@@ -105,116 +106,90 @@ public class Plc4xSinkRecordProcessor extends 
BasePlc4xProcessor {
         }
                        
                final ComponentLog logger = getLogger();
+
                // Get an instance of a component able to read from a PLC.
                final AtomicLong nrOfRows = new AtomicLong(0L);
                final StopWatch executeTime = new StopWatch(true);
 
-               final FlowFile originalFlowFile = fileToProcess;
-
-               InputStream in = session.read(originalFlowFile);
-
-               Record record = null;
-               
-               try (RecordReader recordReader = 
context.getProperty(PLC_RECORD_READER_FACTORY)
-                       .asControllerService(RecordReaderFactory.class)
-                       .createRecordReader(originalFlowFile, in, logger)){
-
-                       while ((record = recordReader.nextRecord()) != null) {
-                               long nrOfRowsHere = 0L;
-                               PlcWriteResponse plcWriteResponse;
-                               PlcWriteRequest writeRequest;
-
-                               Map<String,String> addressMap = 
getPlcAddressMap(context, fileToProcess);
-                               final Map<String, PlcTag> tags = 
getSchemaCache().retrieveTags(addressMap);
-
-                               try (PlcConnection connection = 
getConnectionManager().getConnection(getConnectionString())) {
-                                       PlcWriteRequest.Builder builder = 
connection.writeRequestBuilder();
-                                       
-                                       
-                                       if (tags != null){
-                                               for (Map.Entry<String,PlcTag> 
tag : tags.entrySet()){
-                                                       if 
(record.toMap().containsKey(tag.getKey())) {
-                                                               
builder.addTag(tag.getKey(), tag.getValue(), record.getValue(tag.getKey()));
-                                                               nrOfRowsHere++;
-                                                       } else {
-                                                               if 
(debugEnabled)
-                                               logger.debug("PlcTag " + tag + 
" is declared as address but was not found on input record.");
-                                                       }
+               try {
+                       session.read(fileToProcess, in -> {
+                               Record record = null;
+                       
+                               try (RecordReader recordReader = 
context.getProperty(PLC_RECORD_READER_FACTORY)
+                                       
.asControllerService(RecordReaderFactory.class)
+                                       .createRecordReader(fileToProcess, in, 
logger)){
+
+                                       while ((record = 
recordReader.nextRecord()) != null) {
+                                               AtomicLong nrOfRowsHere = new 
AtomicLong(0);
+                                               PlcWriteRequest writeRequest;
+
+                                               final Map<String,String> 
addressMap = getPlcAddressMap(context, fileToProcess);
+                                               final Map<String, PlcTag> tags 
= getSchemaCache().retrieveTags(addressMap);
+
+                                               try (PlcConnection connection = 
getConnectionManager().getConnection(getConnectionString(context, 
fileToProcess))) {
+                                                       
+                                                       writeRequest = 
getWriteRequest(logger, addressMap, tags, record.toMap(), connection, 
nrOfRowsHere);
+
+                                                       PlcWriteResponse 
plcWriteResponse = writeRequest.execute().get(getTimeout(context, 
fileToProcess), TimeUnit.MILLISECONDS);
+
+                                                       // Response check if 
values were written
+                                                       
evaluateWriteResponse(logger, record.toMap(), plcWriteResponse);
+
+                                               } catch (TimeoutException e) {
+                                                       logger.error("Timeout 
writting the data to the PLC", e);
+                                                       
getConnectionManager().removeCachedConnection(getConnectionString(context, 
fileToProcess));
+                                                       throw new 
ProcessException(e);
+                                               } catch (PlcConnectionException 
e) {
+                                                       logger.error("Error 
getting the PLC connection", e);
+                                                       throw new 
ProcessException("Got an a PlcConnectionException while trying to get a 
connection", e);
+                                               } catch (Exception e) {
+                                                       logger.error("Exception 
writting the data to the PLC", e);
+                                                       throw (e instanceof 
ProcessException) ? (ProcessException) e : new ProcessException(e);
                                                }
-                                       } else {
-                                               if (debugEnabled)
-                                       logger.debug("Plc-Avro schema and 
PlcTypes resolution not found in cache and will be added with key: " + 
addressMap);
-                                               for (Map.Entry<String,String> 
entry: addressMap.entrySet()){
-                                                       if 
(record.toMap().containsKey(entry.getKey())) {
-                                                               
builder.addTagAddress(entry.getKey(), entry.getValue(), 
record.getValue(entry.getKey()));
-                                                               nrOfRowsHere++;
-                                                       } else {
-                                                               if 
(debugEnabled)
-                                               logger.debug("PlcTag " + 
entry.getKey() + " with address " + entry.getValue() + " was not found on input 
record.");
-                                                       }
+                                                       
+                                               if (tags == null){
+                                                       if (debugEnabled)
+                                                               
logger.debug("Adding PlcTypes resolution into cache with key: " + addressMap);
+                                                       
getSchemaCache().addSchema(
+                                                               addressMap, 
+                                                               
writeRequest.getTagNames(),
+                                                               
writeRequest.getTags(),
+                                                               null
+                                                       );
                                                }
-                                       }
-                                       writeRequest = builder.build();
-
-                                       plcWriteResponse = 
writeRequest.execute().get(this.timeout, TimeUnit.MILLISECONDS);
+                                               
nrOfRows.getAndAdd(nrOfRowsHere.get());
 
+                                               
+                                       }
                                } catch (Exception e) {
-                                       in.close();
-                                       logger.error("Exception writing the 
data to PLC", e);
-                                       session.transfer(originalFlowFile, 
REL_FAILURE);
-                                       session.commitAsync();
                                        throw (e instanceof ProcessException) ? 
(ProcessException) e : new ProcessException(e);
-                               }
-
-                               // Response check if values were written
-                               if (plcWriteResponse != null){
-                                       PlcResponseCode code = null;
-
-                                       for (String tag : 
plcWriteResponse.getTagNames()) {
-                                               code = 
plcWriteResponse.getResponseCode(tag);
-                                               if 
(!code.equals(PlcResponseCode.OK)) {
-                                                       logger.error("Not OK 
code when writing the data to PLC for tag " + tag 
-                                                               + " with value  
" + record.getValue(tag).toString() 
-                                                               + " in addresss 
" + plcWriteResponse.getTag(tag).getAddressString());
-                                                       throw new 
ProcessException("Writing response code for " + 
plcWriteResponse.getTag(tag).getAddressString() + "was " + code.name() + ", 
expected OK");
-                                               }
-                                       }
-                                       if (tags == null && writeRequest != 
null){
-                                               if (debugEnabled)
-                                                       logger.debug("Adding 
PlcTypes resolution into cache with key: " + addressMap);
-                                               getSchemaCache().addSchema(
-                                                       addressMap, 
-                                                       
writeRequest.getTagNames(),
-                                                       writeRequest.getTags(),
-                                                       null
-                                               );
-                                       }
-                                       nrOfRows.getAndAdd(nrOfRowsHere);
-                               }
-                       }
-                       in.close();
-               } catch (Exception e) {
-                       throw new ProcessException(e);
+                               } 
+                       });
+
+               } catch (ProcessException e) {
+                       logger.error("Exception writing the data to the PLC", 
e);
+                       session.putAttribute(fileToProcess, EXCEPTION, 
e.getLocalizedMessage());
+                       session.transfer(fileToProcess, REL_FAILURE);
+                       session.commitAsync();
+                       throw e;
                } 
-               
+
+
                long executionTimeElapsed = 
executeTime.getElapsed(TimeUnit.MILLISECONDS);
                final Map<String, String> attributesToAdd = new HashMap<>();
                attributesToAdd.put(RESULT_ROW_COUNT, 
String.valueOf(nrOfRows.get()));
                attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, 
String.valueOf(executionTimeElapsed));
                attributesToAdd.put(INPUT_FLOWFILE_UUID, 
fileToProcess.getAttribute(CoreAttributes.UUID.key()));
                
-               FlowFile resultSetFF = 
session.putAllAttributes(originalFlowFile, attributesToAdd);
+               session.putAllAttributes(fileToProcess, attributesToAdd);
 
-               logger.info("Writing {} fields from {} records; transferring to 
'success'", new Object[] { nrOfRows.get(), resultSetFF });
-               // Report a FETCH event if there was an incoming flow file, or 
a RECEIVE event
-               // otherwise
+               session.transfer(fileToProcess, REL_SUCCESS);
+               
+               logger.info("Writing {} fields from {} records; transferring to 
'success'", nrOfRows.get(), fileToProcess);
                if (context.hasIncomingConnection()) {
-                       session.getProvenanceReporter().fetch(resultSetFF, 
"Writted " + nrOfRows.get() + " rows", executionTimeElapsed);
+                       session.getProvenanceReporter().fetch(fileToProcess, 
"Writted " + nrOfRows.get() + " rows", executionTimeElapsed);
                } else {
-                       session.getProvenanceReporter().receive(resultSetFF, 
"Writted " + nrOfRows.get() + " rows", executionTimeElapsed);
+                       session.getProvenanceReporter().receive(fileToProcess, 
"Writted " + nrOfRows.get() + " rows", executionTimeElapsed);
                }
-
-               session.transfer(resultSetFF, BasePlc4xProcessor.REL_SUCCESS);
-               session.commitAsync();
        }
 }
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
index 0f57de46d0..a6f2da2e17 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
@@ -18,10 +18,9 @@
  */
 package org.apache.plc4x.nifi;
 
-import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -46,70 +45,74 @@ import org.apache.plc4x.java.api.model.PlcTag;
 @WritesAttributes({@WritesAttribute(attribute="value", description="some 
value")})
 public class Plc4xSourceProcessor extends BasePlc4xProcessor {
 
+       public static final String EXCEPTION = "plc4x.read.exception";
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        
+        FlowFile incomingFlowFile = null;
+        if (context.hasIncomingConnection()) {
+            incomingFlowFile = session.get();
+            if (incomingFlowFile == null && context.hasNonLoopConnection()) {
+                return;
+            }
+        }
 
         final ComponentLog logger = getLogger();
-        // Get an instance of a component able to read from a PLC.
-        try(PlcConnection connection = 
getConnectionManager().getConnection(getConnectionString())) {
+        final FlowFile flowFile = session.create();
+    
+        try(PlcConnection connection = 
getConnectionManager().getConnection(getConnectionString(context, 
incomingFlowFile))) {
 
-            // Prepare the request.
             if (!connection.getMetadata().canRead()) {
-                throw new ProcessException("Writing not supported by 
connection");
+                throw new ProcessException("Reading not supported by 
connection");
             }
 
-            FlowFile flowFile = session.create();
-            try {
-                PlcReadRequest.Builder builder = 
connection.readRequestBuilder();
-                Map<String,String> addressMap = getPlcAddressMap(context, 
flowFile);
-                final Map<String, PlcTag> tags = 
getSchemaCache().retrieveTags(addressMap);
+            final Map<String,String> addressMap = getPlcAddressMap(context, 
incomingFlowFile);
+            final Map<String, PlcTag> tags = 
getSchemaCache().retrieveTags(addressMap);
 
-                if (tags != null){
-                    for (Map.Entry<String,PlcTag> tag : tags.entrySet()){
-                        builder.addTag(tag.getKey(), tag.getValue());
-                    }
-                } else {
-                    if (debugEnabled)
-                        logger.debug("PlcTypes resolution not found in cache 
and will be added with key: " + addressMap);
-                    for (Map.Entry<String,String> entry: 
addressMap.entrySet()){
-                        builder.addTagAddress(entry.getKey(), 
entry.getValue());
-                    }
-                }
 
-                PlcReadRequest readRequest = builder.build();
-                PlcReadResponse response = 
readRequest.execute().get(this.timeout, TimeUnit.MILLISECONDS);
-                Map<String, String> attributes = new HashMap<>();
-                for (String tagName : response.getTagNames()) {
-                    for (int i = 0; i < response.getNumberOfValues(tagName); 
i++) {
-                        Object value = response.getObject(tagName, i);
-                        attributes.put(tagName, String.valueOf(value));
-                    }
-                }
-                flowFile = session.putAllAttributes(flowFile, attributes); 
-                
-                if (tags == null){
-                    if (debugEnabled)
-                        logger.debug("Adding PlcTypes resolution into cache 
with key: " + addressMap);
-                    getSchemaCache().addSchema(
-                        addressMap, 
-                        readRequest.getTagNames(),
-                        readRequest.getTags(),
-                        null
-                    );
-                }
+            PlcReadRequest readRequest = getReadRequest(logger, addressMap, 
tags, connection);
 
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new ProcessException(e);
-            } catch (ExecutionException e) {
+            try {
+                final PlcReadResponse response = 
readRequest.execute().get(getTimeout(context, incomingFlowFile), 
TimeUnit.MILLISECONDS);
+                
+                evaluateReadResponse(session, flowFile, response);
+                
+            } catch (TimeoutException e) {
+                logger.error("Timeout reading the data from PLC", e);
+                
getConnectionManager().removeCachedConnection(getConnectionString(context, 
incomingFlowFile));
                 throw new ProcessException(e);
+            } catch (Exception e) {
+                logger.error("Exception reading the data from PLC", e);
+                throw (e instanceof ProcessException) ? (ProcessException) e : 
new ProcessException(e);
+            }
+
+            
+            if (incomingFlowFile != null) {
+                session.remove(incomingFlowFile);
             }
             session.transfer(flowFile, REL_SUCCESS);
-        } catch (ProcessException e) {
-            throw e;
+                
+            if (tags == null){
+                if (debugEnabled)
+                    logger.debug("Adding PlcTypes resolution into cache with 
key: " + addressMap);
+                getSchemaCache().addSchema(
+                    addressMap, 
+                    readRequest.getTagNames(),
+                    readRequest.getTags(),
+                    null
+                );
+            }
+            
         } catch (Exception e) {
-            throw new ProcessException("Got an error while trying to get a 
connection", e);
+            session.remove(flowFile);
+            if (incomingFlowFile != null){
+                incomingFlowFile = session.putAttribute(incomingFlowFile, 
EXCEPTION, e.getLocalizedMessage());
+                session.transfer(incomingFlowFile, REL_FAILURE);
+            }
+            session.commitAsync();
+            throw (e instanceof ProcessException) ? (ProcessException) e : new 
ProcessException(e);
         }
     }
-
+    
 }
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
index 6d2e5c246e..c2e53e7625 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
@@ -35,7 +35,6 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -70,8 +69,10 @@ public class Plc4xSourceRecordProcessor extends 
BasePlc4xProcessor {
        public static final String RESULT_ROW_COUNT = "plc4x.read.row.count";
        public static final String RESULT_QUERY_EXECUTION_TIME = 
"plc4x.read.query.executiontime";
        public static final String INPUT_FLOWFILE_UUID = "input.flowfile.uuid";
+       public static final String EXCEPTION = "plc4x.read.exception";
 
-       public static final PropertyDescriptor PLC_RECORD_WRITER_FACTORY = new 
PropertyDescriptor.Builder().name("plc4x-record-writer").displayName("Record 
Writer")
+       public static final PropertyDescriptor PLC_RECORD_WRITER_FACTORY = new 
PropertyDescriptor.Builder()
+               .name("plc4x-record-writer").displayName("Record Writer")
                .description("Specifies the Controller Service to use for 
writing results to a FlowFile. The Record Writer may use Inherit Schema to 
emulate the inferred schema behavior, i.e. "
                                + "an explicit schema need not be defined in 
the writer, and will be supplied by the same logic used to infer the schema 
from the column types.")
                .identifiesControllerService(RecordSetWriterFactory.class)
@@ -89,130 +90,114 @@ public class Plc4xSourceRecordProcessor extends 
BasePlc4xProcessor {
                this.properties = Collections.unmodifiableList(pds);
        }
 
-       @OnScheduled
-       @Override
-       public void onScheduled(final ProcessContext context) {
-               super.onScheduled(context);
-       }
        
        @Override
        public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+               
                FlowFile fileToProcess = null;
-               // TODO: In the future the processor will be configurable to 
get the connection from incoming flowfile
                if (context.hasIncomingConnection()) {
                        fileToProcess = session.get();
-                       // If we have no FlowFile, and all incoming connections 
are self-loops then we
-                       // can continue on.
-                       // However, if we have no FlowFile and we have 
connections coming from other
-                       // Processors, then we know that we should run only if 
we have a FlowFile.
+                       
                        if (fileToProcess == null && 
context.hasNonLoopConnection()) {
                                return;
                        }
                }
-               
-               Plc4xWriter plc4xWriter = new 
RecordPlc4xWriter(context.getProperty(PLC_RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class),
 fileToProcess == null ? Collections.emptyMap() : 
fileToProcess.getAttributes());
+
                final ComponentLog logger = getLogger();
+               
+               
                // Get an instance of a component able to read from a PLC.
-               // TODO: Change this to use NiFi service instead of direct 
connection
                final AtomicLong nrOfRows = new AtomicLong(0L);
                final StopWatch executeTime = new StopWatch(true);
 
-               String inputFileUUID = fileToProcess == null ? null : 
fileToProcess.getAttribute(CoreAttributes.UUID.key());
-               Map<String, String> inputFileAttrMap = fileToProcess == null ? 
null : fileToProcess.getAttributes();
-               FlowFile resultSetFF;
+               final FlowFile resultSetFF;
                if (fileToProcess == null) {
                        resultSetFF = session.create();
                } else {
                        resultSetFF = session.create(fileToProcess);
-               }
-               if (inputFileAttrMap != null) {
-                       resultSetFF = session.putAllAttributes(resultSetFF, 
inputFileAttrMap);
+                       session.putAttribute(resultSetFF, INPUT_FLOWFILE_UUID, 
fileToProcess.getAttribute(CoreAttributes.UUID.key()));
                }
 
-               try (PlcConnection connection = 
getConnectionManager().getConnection(getConnectionString())) {
-                       PlcReadRequest.Builder builder = 
connection.readRequestBuilder();
-                       Map<String,String> addressMap = 
getPlcAddressMap(context, fileToProcess);
-                       final RecordSchema recordSchema = 
getSchemaCache().retrieveSchema(addressMap);
-                       final Map<String, PlcTag> tags = 
getSchemaCache().retrieveTags(addressMap);
+               final FlowFile originalFlowFile = fileToProcess;
 
-                       if (tags != null){
-                               for (Map.Entry<String,PlcTag> tag : 
tags.entrySet()){
-                                       builder.addTag(tag.getKey(), 
tag.getValue());
-                               }
-                       } else {
-                               if (debugEnabled)
-                    logger.debug("Plc-Avro schema and PlcTypes resolution not 
found in cache and will be added with key: " + addressMap);
-                               for (Map.Entry<String,String> entry: 
addressMap.entrySet()){
-                                       builder.addTagAddress(entry.getKey(), 
entry.getValue());
-                               }
-                       }
-            
-                       PlcReadRequest readRequest = builder.build();
-                       final FlowFile originalFlowFile = fileToProcess;
-                       resultSetFF = session.write(resultSetFF, out -> {
-                               try {
-                                       PlcReadResponse readResponse = 
readRequest.execute().get(this.timeout, TimeUnit.MILLISECONDS);
+               Plc4xWriter plc4xWriter = new 
RecordPlc4xWriter(context.getProperty(PLC_RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class),
 
+                       fileToProcess == null ? Collections.emptyMap() : 
fileToProcess.getAttributes());
+
+
+               try {
+                       session.write(resultSetFF, out -> {
+                               final Map<String,String> addressMap = 
getPlcAddressMap(context, originalFlowFile);
+                               final RecordSchema recordSchema = 
getSchemaCache().retrieveSchema(addressMap);
+                               final Map<String, PlcTag> tags = 
getSchemaCache().retrieveTags(addressMap);
+                               PlcReadRequest readRequest;
+                               Long nrOfRowsHere;
+
+                               try (PlcConnection connection = 
getConnectionManager().getConnection(getConnectionString(context, 
originalFlowFile))) {
                                        
-                                       if(originalFlowFile == null) //there is 
no inherit attributes to use in writer service 
-                                               
nrOfRows.set(plc4xWriter.writePlcReadResponse(readResponse, out, logger, null, 
recordSchema));
-                                       else 
-                                               
nrOfRows.set(plc4xWriter.writePlcReadResponse(readResponse, out, logger, null, 
recordSchema, originalFlowFile));
-                               } catch (InterruptedException e) {
-                                       logger.error("InterruptedException 
reading the data from PLC", e);
-                           Thread.currentThread().interrupt();
-                           throw new ProcessException(e);
+                                       readRequest =  getReadRequest(logger, 
addressMap, tags, connection);
+                                       
+                                       PlcReadResponse readResponse = 
readRequest.execute().get(getTimeout(context, originalFlowFile), 
TimeUnit.MILLISECONDS);
+                                                       
+                                       nrOfRowsHere = 
evaluateReadResponse(context, logger, originalFlowFile, plc4xWriter, out, 
recordSchema, readResponse);
+
                                } catch (TimeoutException e) {
                                        logger.error("Timeout reading the data 
from PLC", e);
+                                       
getConnectionManager().removeCachedConnection(getConnectionString(context, 
originalFlowFile));
                                        throw new ProcessException(e);
+                               } catch (PlcConnectionException e) {
+                                       logger.error("Error getting the PLC 
connection", e);
+                                       throw new ProcessException("Got an a 
PlcConnectionException while trying to get a connection", e);
                                } catch (Exception e) {
                                        logger.error("Exception reading the 
data from PLC", e);
                                        throw (e instanceof ProcessException) ? 
(ProcessException) e : new ProcessException(e);
                                }
-                       });
 
-                       if (recordSchema == null){
-                               if (debugEnabled)
-                    logger.debug("Adding Plc-Avro schema and PlcTypes 
resolution into cache with key: " + addressMap);
-                               getSchemaCache().addSchema(
-                                       addressMap, 
-                                       readRequest.getTagNames(),
-                                       readRequest.getTags(),
-                                       plc4xWriter.getRecordSchema()
-                               );
-                       }
-                       long executionTimeElapsed = 
executeTime.getElapsed(TimeUnit.MILLISECONDS);
-                       final Map<String, String> attributesToAdd = new 
HashMap<>();
-                       attributesToAdd.put(RESULT_ROW_COUNT, 
String.valueOf(nrOfRows.get()));
-                       attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, 
String.valueOf(executionTimeElapsed));
-                       if (inputFileUUID != null) {
-                               attributesToAdd.put(INPUT_FLOWFILE_UUID, 
inputFileUUID);
-                       }
-                       
attributesToAdd.putAll(plc4xWriter.getAttributesToAdd());
-                       resultSetFF = session.putAllAttributes(resultSetFF, 
attributesToAdd);
-                       plc4xWriter.updateCounters(session);
-                       logger.info("{} contains {} records; transferring to 
'success'", new Object[] { resultSetFF, nrOfRows.get() });
-                       // Report a FETCH event if there was an incoming flow 
file, or a RECEIVE event
-                       // otherwise
-                       if (context.hasIncomingConnection()) {
-                               
session.getProvenanceReporter().fetch(resultSetFF, "Retrieved " + 
nrOfRows.get() + " rows", executionTimeElapsed);
-                       } else {
-                               
session.getProvenanceReporter().receive(resultSetFF, "Retrieved " + 
nrOfRows.get() + " rows", executionTimeElapsed);
-                       }
+                               if (recordSchema == null){
+                                       if (debugEnabled)
+                                               logger.debug("Adding PlcTypes 
resolution into cache with key: " + addressMap);
+                                       getSchemaCache().addSchema(
+                                               addressMap, 
+                                               readRequest.getTagNames(),
+                                               readRequest.getTags(),
+                                               plc4xWriter.getRecordSchema()
+                                       );
+                               }
+                               nrOfRows.set(nrOfRowsHere);
+
+                       });
                        
-                       session.transfer(resultSetFF, 
BasePlc4xProcessor.REL_SUCCESS);
-                       // Need to remove the original input file if it exists
+               } catch (Exception e) {
+                       logger.error("Exception reading the data from the PLC", 
e);
                        if (fileToProcess != null) {
-                               session.remove(fileToProcess);
-                               fileToProcess = null;
+                               session.putAttribute(fileToProcess, EXCEPTION, 
e.getLocalizedMessage());
+                               session.transfer(fileToProcess, REL_FAILURE);
                        }
+                       session.remove(resultSetFF);
                        session.commitAsync();
-                       
-               } catch (PlcConnectionException e) {
-                       logger.error("Error getting the PLC connection", e);
-                       throw new ProcessException("Got an a 
PlcConnectionException while trying to get a connection", e);
-               } catch (Exception e) {
-                       logger.error("Got an error while trying to get a 
connection", e);
-                       throw new ProcessException("Got an error while trying 
to get a connection", e);
+                       throw (e instanceof ProcessException) ? 
(ProcessException) e : new ProcessException(e);
+               }
+
+               plc4xWriter.updateCounters(session);
+               long executionTimeElapsed = 
executeTime.getElapsed(TimeUnit.MILLISECONDS);
+               final Map<String, String> attributesToAdd = new HashMap<>();
+               attributesToAdd.put(RESULT_ROW_COUNT, 
String.valueOf(nrOfRows.get()));
+               attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, 
String.valueOf(executionTimeElapsed));
+               attributesToAdd.putAll(plc4xWriter.getAttributesToAdd());
+
+               session.putAllAttributes(resultSetFF, attributesToAdd);
+               
+               logger.info("{} contains {} records; transferring to 
'success'", resultSetFF, nrOfRows.get());
+
+               if (context.hasIncomingConnection()) {
+                       session.getProvenanceReporter().fetch(resultSetFF, 
"Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed);
+               } else {
+                       session.getProvenanceReporter().receive(resultSetFF, 
"Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed);
+               }
+               
+               if (fileToProcess != null) {
+                       session.remove(fileToProcess);
                }
+               session.transfer(resultSetFF, REL_SUCCESS);
        }
+
 }
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessStrategy.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessStrategy.java
index 434e924d4a..3794c0ba07 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessStrategy.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessStrategy.java
@@ -16,11 +16,32 @@
  */
 package org.apache.plc4x.nifi.address;
 
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 
+import java.util.List;
 import java.util.Map;
 
 public interface AddressesAccessStrategy {
+    /**
+     * Returns the allowable value associated with the strategy.
+     * @return AllowableValue the allowable value associated
+     */
+    AllowableValue getAllowableValue();
+
+    /**
+     * Returns a list of property descriptors needed in for the strategy.
+     * @return List of PropertyDescriptor needed for the strategy
+     */
+    List<PropertyDescriptor> getPropertyDescriptors();
+    
+    /**
+     * Returns a map with the names and addresses of the tags.
+     * @param context the context of the processor
+     * @param flowFile the FlowFile being processed
+     * @return Map with the tag names and addresses
+     */
     Map<String, String> extractAddresses(final ProcessContext context, final 
FlowFile flowFile);
 }
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessUtils.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessUtils.java
index bfeff23fdd..9b63d584b2 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessUtils.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessUtils.java
@@ -22,45 +22,75 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.JsonValidator;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.plc4x.java.DefaultPlcDriverManager;
 
 public class AddressesAccessUtils {
-    public static final AllowableValue ADDRESS_PROPERTY = new AllowableValue(
-            "property-address",
-            "Use Properties as Addresses",
-            "Each property will be treated as tag-address pairs after 
Expression Language is evaluated.");
 
-    public static final AllowableValue ADDRESS_TEXT = new AllowableValue(
-            "text-address",
-            "Use 'Address Text' Property",
-            "Addresses will be obtained from 'Address Text' Property. It's 
content must be a valid JSON " +
-                    "after Expression Language is evaluated. ");
+       private static DefaultPlcDriverManager manager = new 
DefaultPlcDriverManager();
 
-    public static final PropertyDescriptor PLC_ADDRESS_ACCESS_STRATEGY = new 
PropertyDescriptor.Builder()
-            .name("plc4x-address-access-strategy")
-            .displayName("Address Access Strategy")
-            .description("Strategy used to obtain the PLC addresses")
-            .allowableValues(ADDRESS_PROPERTY, ADDRESS_TEXT)
-            .defaultValue(ADDRESS_PROPERTY.getValue())
-            .required(true)
-            .build();
+    public static DefaultPlcDriverManager getManager() {
+        return manager;
+    }
+       
+       public static final AllowableValue ADDRESS_PROPERTY = new 
AllowableValue(
+               "property-address",
+               "Use Properties as Addresses",
+               "Each property will be treated as tag-address pairs after 
Expression Language is evaluated.");
 
-    public static final PropertyDescriptor ADDRESS_TEXT_PROPERTY = new 
PropertyDescriptor.Builder()
-            .name("text-address-property")
-            .displayName("Address Text")
-            .description("Must contain a valid JSON object after Expression 
Language is evaluated. "
-                    + "Each field-value is treated as tag-address.")
-            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(new JsonValidator())
-            .dependsOn(PLC_ADDRESS_ACCESS_STRATEGY, ADDRESS_TEXT)
-            .required(true)
-            .build();
+       public static final AllowableValue ADDRESS_TEXT = new AllowableValue(
+               "text-address",
+               "Use 'Address Text' Property",
+               "Addresses will be obtained from 'Address Text' Property. It's 
content must be a valid JSON " +
+                       "after Expression Language is evaluated. ");
 
-    public static AddressesAccessStrategy getAccessStrategy(final 
ProcessContext context) {
-        String value = 
context.getProperty(PLC_ADDRESS_ACCESS_STRATEGY).getValue();
-        if (ADDRESS_PROPERTY.getValue().equalsIgnoreCase(value))
-            return new DynamicPropertyAccessStrategy();
-        else if (ADDRESS_TEXT.getValue().equalsIgnoreCase(value))
-            return new TextPropertyAccessStrategy();
-        return null;
-    }
+       public static final AllowableValue ADDRESS_FILE = new AllowableValue(
+               "file-address",
+               "Use 'Address File' Property",
+               "Addresses will be obtained from the file in 'Address File' 
Property. It's content must be a valid JSON " +
+                       "after Expression Language is evaluated. ");
+
+       public static final PropertyDescriptor PLC_ADDRESS_ACCESS_STRATEGY = 
new PropertyDescriptor.Builder()
+               .name("plc4x-address-access-strategy")
+               .displayName("Address Access Strategy")
+               .description("Strategy used to obtain the PLC addresses")
+               .allowableValues(ADDRESS_PROPERTY, ADDRESS_TEXT, ADDRESS_FILE)
+               .defaultValue(ADDRESS_PROPERTY.getValue())
+               .required(true)
+               .build();
+
+       public static final PropertyDescriptor ADDRESS_TEXT_PROPERTY = new 
PropertyDescriptor.Builder()
+               .name("text-address-property")
+               .displayName("Address Text")
+               .description("Must contain a valid JSON object after Expression 
Language is evaluated. "
+                       + "Each field-value is treated as tag-address.")
+               
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+               .addValidator(new JsonValidator())
+               .addValidator(new 
TextPropertyAccessStrategy.TagValidator(manager))
+               .dependsOn(PLC_ADDRESS_ACCESS_STRATEGY, ADDRESS_TEXT)
+               .required(true)
+               .build();
+
+       public static final PropertyDescriptor ADDRESS_FILE_PROPERTY = new 
PropertyDescriptor.Builder()
+               .name("file-address-property")
+               .displayName("Address File")
+               .description("Must contain a valid path after Expression 
Language is evaluated. "
+                       + "The file content must be a valid JSON and each 
field-value is treated as tag-address.")
+               
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+               .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+               .addValidator(new 
FilePropertyAccessStrategy.TagValidator(manager))
+               .dependsOn(PLC_ADDRESS_ACCESS_STRATEGY, ADDRESS_FILE)
+               .required(true)
+               .build();
+
+       public static AddressesAccessStrategy getAccessStrategy(final 
ProcessContext context) {
+               String value = 
context.getProperty(PLC_ADDRESS_ACCESS_STRATEGY).getValue();
+               if (ADDRESS_PROPERTY.getValue().equalsIgnoreCase(value))
+                       return new DynamicPropertyAccessStrategy();
+               else if (ADDRESS_TEXT.getValue().equalsIgnoreCase(value))
+                       return new TextPropertyAccessStrategy();
+               else if (ADDRESS_FILE.getValue().equalsIgnoreCase(value))
+                       return new FilePropertyAccessStrategy();
+               return null;
+       }
 }
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/BaseAccessStrategy.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/BaseAccessStrategy.java
new file mode 100644
index 0000000000..c94d7d3141
--- /dev/null
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/BaseAccessStrategy.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.plc4x.nifi.address;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.plc4x.java.DefaultPlcDriverManager;
+import org.apache.plc4x.java.api.PlcDriver;
+import org.apache.plc4x.nifi.BasePlc4xProcessor;
+
+
+public abstract class BaseAccessStrategy implements AddressesAccessStrategy{
+    private boolean isInitializated = false;
+    private boolean isDynamic;
+    protected Map<String,String> cachedAddresses = null;
+
+    protected AllowableValue allowableValue;
+    protected List<PropertyDescriptor> propertyDescriptors = new ArrayList<>();
+
+    protected Map<String, String> getCachedAddresses() {
+        return cachedAddresses;
+    }
+
+    public Map<String,String> extractAddressesFromResources(final 
ProcessContext context, final FlowFile flowFile) {
+        throw new UnsupportedOperationException("Method 
'extractAddressesFromResources' not implemented");
+    }
+
+
+    @Override
+    public Map<String, String> extractAddresses(final ProcessContext context, 
final FlowFile flowFile) {
+        if (!isInitializated) {
+            getPropertyDescriptors().forEach(prop -> {
+                if (context.isExpressionLanguagePresent(prop)){
+                    isDynamic = true;
+                }
+            });
+            isInitializated = true;
+        }
+
+        Map<String, String> result = getCachedAddresses();
+        if (result == null) {
+            result = extractAddressesFromResources(context, flowFile);
+            if (!isDynamic) {
+                cachedAddresses = result;
+            }
+        }
+        return result;
+    }
+
+    public static class TagValidator implements Validator {
+        
+        private DefaultPlcDriverManager manager;
+
+        public TagValidator(DefaultPlcDriverManager manager) {
+            this.manager = manager;
+        }
+
+        protected void checkTags(PlcDriver driver, Collection<String> tags) {
+            for (String tag : tags) {
+                driver.prepareTag(tag);
+            }
+        }
+
+        protected Collection<String> getTags(String input) throws Exception {
+            throw new UnsupportedOperationException("Method 'getTags' not 
implemented");
+        } 
+
+        @Override
+        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+            String connectionString = 
context.getProperty(BasePlc4xProcessor.PLC_CONNECTION_STRING).getValue();
+
+            if (context.isExpressionLanguageSupported(subject) && 
context.isExpressionLanguagePresent(input) || 
+                context.isExpressionLanguagePresent(connectionString)) {
+                return new 
ValidationResult.Builder().subject(subject).input(input)
+                    .explanation("Expression Language 
Present").valid(true).build();
+            }
+
+            try {
+                PlcDriver driver = manager.getDriverForUrl(connectionString);
+
+                if (!context.isExpressionLanguagePresent(input)) {
+                    checkTags(driver, getTags(input));
+                } 
+                
+            }catch (Exception e) {
+                    return new ValidationResult.Builder().subject(subject)
+                        .explanation(e.getLocalizedMessage())
+                        .valid(false)
+                        .build();
+            }
+            
+            return new ValidationResult.Builder().subject(subject)
+                .explanation("")
+                .valid(true)
+                .build();
+        }
+    }
+    
+}
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/DynamicPropertyAccessStrategy.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/DynamicPropertyAccessStrategy.java
index fe7c3c5db2..879766f6de 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/DynamicPropertyAccessStrategy.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/DynamicPropertyAccessStrategy.java
@@ -17,14 +17,34 @@
 
 package org.apache.plc4x.nifi.address;
 
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.plc4x.java.DefaultPlcDriverManager;
 
-public class DynamicPropertyAccessStrategy implements AddressesAccessStrategy{
+
+public class DynamicPropertyAccessStrategy extends BaseAccessStrategy{
+
+    @Override
+    public AllowableValue getAllowableValue() {
+        return AddressesAccessUtils.ADDRESS_PROPERTY;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getPropertyDescriptors() {
+        return List.of();
+    }
+
+    @Override
+    public Map<String,String> extractAddressesFromResources(final 
ProcessContext context, final FlowFile flowFile) {
+        return extractAddressesFromAttributes(context, flowFile);
+    }
 
     private Map<String,String> extractAddressesFromAttributes(final 
ProcessContext context, final FlowFile flowFile) {
         Map<String,String> addressMap = new HashMap<>();
@@ -35,7 +55,16 @@ public class DynamicPropertyAccessStrategy implements 
AddressesAccessStrategy{
         return addressMap; 
     }
 
-    public Map<String, String> extractAddresses(final ProcessContext context, 
final FlowFile flowFile) {
-        return extractAddressesFromAttributes(context, flowFile);
+
+    public static class TagValidator extends BaseAccessStrategy.TagValidator {
+        public TagValidator(DefaultPlcDriverManager manager) {
+            super(manager);
+        }
+
+        @Override
+        protected Collection<String> getTags(String input) throws Exception {
+            return List.of(input);
+        }
     }
+
 }
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/FilePropertyAccessStrategy.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/FilePropertyAccessStrategy.java
new file mode 100644
index 0000000000..b3214375b8
--- /dev/null
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/FilePropertyAccessStrategy.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.plc4x.nifi.address;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+
+import java.nio.file.StandardOpenOption;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.plc4x.java.DefaultPlcDriverManager;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class FilePropertyAccessStrategy extends BaseAccessStrategy {
+
+    @Override
+    public AllowableValue getAllowableValue() {
+        return AddressesAccessUtils.ADDRESS_FILE;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getPropertyDescriptors() {
+        return List.of(AddressesAccessUtils.ADDRESS_FILE_PROPERTY);
+    }
+
+    @Override
+    public Map<String, String> extractAddressesFromResources(final 
ProcessContext context, final FlowFile flowFile) throws ProcessException{
+        try {
+            return 
extractAddressesFromFile(context.getProperty(AddressesAccessUtils.ADDRESS_FILE_PROPERTY).evaluateAttributeExpressions(flowFile).getValue());
+        } catch (Exception e) {
+            throw new ProcessException(e.toString());
+        }
+    }
+
+    public static Map<String,String> extractAddressesFromFile(String fileName) 
throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+
+        Path filePath = Path.of(fileName);
+        InputStream input = Files.newInputStream(filePath, 
StandardOpenOption.READ);
+        
+        return mapper.readerForMapOf(String.class).readValue(input);
+    }
+
+    public static class TagValidator extends BaseAccessStrategy.TagValidator {
+        public TagValidator(DefaultPlcDriverManager manager) {
+            super(manager);
+        }
+
+        @Override
+        protected Collection<String> getTags(String input) throws Exception {
+            return 
FilePropertyAccessStrategy.extractAddressesFromFile(input).values();
+        } 
+    }
+}
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/TextPropertyAccessStrategy.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/TextPropertyAccessStrategy.java
index 7f5c955401..78149f3614 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/TextPropertyAccessStrategy.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/TextPropertyAccessStrategy.java
@@ -17,30 +17,54 @@
 
 package org.apache.plc4x.nifi.address;
 
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
-
+import org.apache.plc4x.java.DefaultPlcDriverManager;
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
-public class TextPropertyAccessStrategy implements AddressesAccessStrategy{
-    private Map<String,String> extractAddressesFromText(String input) throws 
JsonProcessingException {
-        ObjectMapper mapper = new ObjectMapper();
+public class TextPropertyAccessStrategy extends BaseAccessStrategy {
 
-        return mapper.readValue(input, Map.class);
+    @Override
+    public AllowableValue getAllowableValue() {
+        return AddressesAccessUtils.ADDRESS_TEXT;
     }
 
     @Override
-    public Map<String, String> extractAddresses(final ProcessContext context, 
final FlowFile flowFile) throws ProcessException{
+    public List<PropertyDescriptor> getPropertyDescriptors() {
+        return List.of(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY);
+    }
+
+    @Override
+    public Map<String, String> extractAddressesFromResources(final 
ProcessContext context, final FlowFile flowFile) throws ProcessException{
         try {
             return 
extractAddressesFromText(context.getProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY).evaluateAttributeExpressions(flowFile).getValue());
         } catch (Exception e) {
             throw new ProcessException(e.toString());
         }
-        
+    }
+
+    private static Map<String,String> extractAddressesFromText(String input) 
throws JsonProcessingException {
+        ObjectMapper mapper = new ObjectMapper();
+
+        return mapper.readerForMapOf(String.class).readValue(input);
+    }
+
+    public static class TagValidator extends BaseAccessStrategy.TagValidator {
+        public TagValidator(DefaultPlcDriverManager manager) {
+            super(manager);
+        }
+
+        @Override
+        protected Collection<String> getTags(String input) throws Exception {
+            return 
TextPropertyAccessStrategy.extractAddressesFromText(input).values();
+        } 
     }
 }
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
index 69157594ec..f546159e48 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
@@ -20,6 +20,7 @@ package org.apache.plc4x.nifi.record;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -46,12 +47,18 @@ public class Plc4xReadResponseRecordSet implements 
RecordSet, Closeable {
     private Set<String> rsColumnNames;
     private boolean moreRows;
     private final boolean debugEnabled = logger.isDebugEnabled();
+    private final String timestampFieldName; 
     private boolean isSubscription = false;
+    private Instant timestamp;
 
        private final AtomicReference<RecordSchema> recordSchema = new 
AtomicReference<>(null);
 
-    public Plc4xReadResponseRecordSet(final PlcReadResponse readResponse, 
RecordSchema recordSchema) throws IOException {
+    public Plc4xReadResponseRecordSet(final PlcReadResponse readResponse, 
RecordSchema recordSchema, String timestampFieldName) {
+        this.timestampFieldName = timestampFieldName;
         this.readResponse = readResponse;
+        if (!isSubscription) {
+            timestamp = Instant.now();
+        }
         moreRows = true;
         
         isSubscription = readResponse.getRequest() == null;
@@ -68,7 +75,7 @@ public class Plc4xReadResponseRecordSet implements RecordSet, 
Closeable {
         rsColumnNames = responseDataStructure.keySet();
                
         if (recordSchema == null) {
-               Schema avroSchema = 
Plc4xCommon.createSchema(responseDataStructure);            
+               Schema avroSchema = 
Plc4xCommon.createSchema(responseDataStructure, this.timestampFieldName);       
    
                this.recordSchema.set(AvroTypeUtil.createSchema(avroSchema));
         } else {
             this.recordSchema.set(recordSchema);
@@ -78,7 +85,7 @@ public class Plc4xReadResponseRecordSet implements RecordSet, 
Closeable {
 
     }
 
-    public Map<String, PlcValue> plc4xSubscriptionResponseRecordSet(final 
DefaultPlcSubscriptionEvent subscriptionEvent) throws IOException {;
+    public Map<String, PlcValue> plc4xSubscriptionResponseRecordSet(final 
DefaultPlcSubscriptionEvent subscriptionEvent) {
         moreRows = true;
         
         if (debugEnabled)
@@ -131,7 +138,7 @@ public class Plc4xReadResponseRecordSet implements 
RecordSet, Closeable {
         //do nothing
     }
 
-    protected Record createRecord(final PlcReadResponse readResponse) throws 
IOException{
+    protected Record createRecord(final PlcReadResponse readResponse) {
         final Map<String, Object> values = new 
HashMap<>(getSchema().getFieldCount());
 
         if (debugEnabled)
@@ -158,7 +165,12 @@ public class Plc4xReadResponseRecordSet implements 
RecordSet, Closeable {
         }
 
         //add timestamp tag to schema
-        values.put(Plc4xCommon.PLC4X_RECORD_TIMESTAMP_FIELD_NAME, 
System.currentTimeMillis());
+        if (isSubscription) {
+            values.put(timestampFieldName, ((DefaultPlcSubscriptionEvent) 
readResponse).getTimestamp().toEpochMilli());
+        } else {
+            values.put(timestampFieldName, timestamp.toEpochMilli());
+        }
+        
         if (debugEnabled)
             logger.debug("added timestamp tag to record.");
 
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xWriter.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xWriter.java
index a950ac3735..1d774e79ef 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xWriter.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xWriter.java
@@ -39,8 +39,8 @@ public interface Plc4xWriter {
      * @return the number of rows written to the output stream
      * @throws Exception if any errors occur during the writing of the result 
set to the output stream
      */
-    long writePlcReadResponse(PlcReadResponse response, OutputStream 
outputStream, ComponentLog logger,  Plc4xReadResponseRowCallback callback, 
RecordSchema recordSchema) throws Exception;
-    long writePlcReadResponse(PlcReadResponse response, OutputStream 
outputStream, ComponentLog logger,  Plc4xReadResponseRowCallback callback, 
RecordSchema recordSchema, FlowFile originalFlowFile) throws Exception;
+    long writePlcReadResponse(PlcReadResponse response, OutputStream 
outputStream, ComponentLog logger,  Plc4xReadResponseRowCallback callback, 
RecordSchema recordSchema, String timestampFieldName) throws Exception;
+    long writePlcReadResponse(PlcReadResponse response, OutputStream 
outputStream, ComponentLog logger,  Plc4xReadResponseRowCallback callback, 
RecordSchema recordSchema, FlowFile originalFlowFile, String 
timestampFieldName) throws Exception;
 
     /**
      * Returns a map of attribute key/value pairs to be added to any outgoing 
flow file(s). The default implementation is to return an empty map.
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
index ca8f0267cd..f1e314b5da 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
@@ -55,9 +55,11 @@ public class RecordPlc4xWriter implements Plc4xWriter {
     }
 
     @Override
-    public long writePlcReadResponse(PlcReadResponse response, OutputStream 
outputStream, ComponentLog logger, Plc4xReadResponseRowCallback callback, 
RecordSchema recordSchema) throws Exception {
-        if (fullRecordSet == null) {
-            fullRecordSet = new 
Plc4xReadResponseRecordSetWithCallback(response, callback, recordSchema);
+    public long writePlcReadResponse(PlcReadResponse response, OutputStream 
outputStream, ComponentLog logger, 
+                Plc4xReadResponseRowCallback callback, RecordSchema 
recordSchema, String timestampFieldName) throws Exception {
+        
+                    if (fullRecordSet == null) {
+            fullRecordSet = new 
Plc4xReadResponseRecordSetWithCallback(response, callback, recordSchema, 
timestampFieldName);
             writeSchema = recordSetWriterFactory.getSchema(originalAttributes, 
fullRecordSet.getSchema());
         }
         Map<String, String> empty = new HashMap<>();
@@ -73,9 +75,11 @@ public class RecordPlc4xWriter implements Plc4xWriter {
     }
 
     @Override
-    public long writePlcReadResponse(PlcReadResponse response, OutputStream 
outputStream, ComponentLog logger, Plc4xReadResponseRowCallback callback, 
RecordSchema recordSchema, FlowFile originalFlowFile) throws Exception {
-        if (fullRecordSet == null) {
-            fullRecordSet = new 
Plc4xReadResponseRecordSetWithCallback(response, callback, recordSchema);
+    public long writePlcReadResponse(PlcReadResponse response, OutputStream 
outputStream, ComponentLog logger, 
+            Plc4xReadResponseRowCallback callback, RecordSchema recordSchema, 
FlowFile originalFlowFile, String timestampFieldName) throws Exception {
+        
+                if (fullRecordSet == null) {
+            fullRecordSet = new 
Plc4xReadResponseRecordSetWithCallback(response, callback, recordSchema, 
timestampFieldName);
             writeSchema = recordSetWriterFactory.getSchema(originalAttributes, 
fullRecordSet.getSchema());
         }
 
@@ -158,8 +162,10 @@ public class RecordPlc4xWriter implements Plc4xWriter {
     private static class Plc4xReadResponseRecordSetWithCallback extends 
Plc4xReadResponseRecordSet {
         private final Plc4xReadResponseRowCallback callback;
 
-        public Plc4xReadResponseRecordSetWithCallback(final PlcReadResponse 
readResponse, Plc4xReadResponseRowCallback callback, RecordSchema recordSchema) 
throws IOException {
-            super(readResponse, recordSchema);
+        public Plc4xReadResponseRecordSetWithCallback(final PlcReadResponse 
readResponse, Plc4xReadResponseRowCallback callback, 
+                RecordSchema recordSchema, String timestampFieldName) {
+
+            super(readResponse, recordSchema, timestampFieldName);
             this.callback = callback;
         }
 
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
index 801bbd90fb..df80ffd6d6 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
@@ -19,9 +19,9 @@
 package org.apache.plc4x.nifi.record;
 
 import java.util.HashMap;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -60,7 +60,7 @@ public class SchemaCache {
      * @param tagsList list of PlcTag's
      * @param schema record schema used for PlcResponse serialization. Can be 
null
      */
-    public void addSchema(final Map<String,String> schemaIdentifier, final 
LinkedHashSet<String> tagsNames, final List<? extends PlcTag> tagsList,  final 
RecordSchema schema) {        
+    public void addSchema(final Map<String,String> schemaIdentifier, final 
Set<String> tagsNames, final List<? extends PlcTag> tagsList,  final 
RecordSchema schema) {        
         if (!schemaMap.containsKey(schemaIdentifier.toString())){
             if (nextSchemaPosition.get() == cacheSize.get()){
                 nextSchemaPosition.set(0);
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
index a46cea7ee5..b46f9f72cc 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
@@ -45,7 +45,6 @@ public class Plc4xListenerDispatcher implements Runnable {
     private ComponentLog logger;
     private boolean running = false;
     private BlockingQueue<PlcSubscriptionEvent> events;
-    private PlcSubscriptionResponse subscriptionResponse;
     private PlcConnection connection;
     private Long timeout;
     private BlockingQueue<PlcSubscriptionEvent> queuedEvents;
@@ -95,7 +94,7 @@ public class Plc4xListenerDispatcher implements Runnable {
             }
         }
         PlcSubscriptionRequest subscriptionRequest = builder.build();
-
+        PlcSubscriptionResponse subscriptionResponse;
         try {
             subscriptionResponse = subscriptionRequest.execute().get(timeout, 
TimeUnit.MILLISECONDS);
             
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/util/Plc4xCommon.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/util/Plc4xCommon.java
index a0fd6c9258..00075f6cfc 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/util/Plc4xCommon.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/util/Plc4xCommon.java
@@ -53,9 +53,6 @@ import org.apache.plc4x.java.spi.values.PlcWORD;
 
 public class Plc4xCommon {
 
-       
-       public static final String PLC4X_RECORD_TIMESTAMP_FIELD_NAME = "ts";
-       
        /**
         * This method is used to infer output AVRO schema directly from the 
PlcReadResponse object. 
         * It is directly used from the 
RecordPlc4xWriter.writePlcReadResponse() method.
@@ -66,7 +63,7 @@ public class Plc4xCommon {
         * @param responseDataStructure: a map that reflects the structure of 
the answer given by the PLC when making a Read Request.
         * @return AVRO Schema built from responseDataStructure.
         */
-       public static Schema createSchema(Map<String, ? extends PlcValue> 
responseDataStructure){
+       public static Schema createSchema(Map<String, ? extends PlcValue> 
responseDataStructure, String timestampFieldName){
                //plc and record datatype map
                final FieldAssembler<Schema> builder = 
SchemaBuilder.record("PlcReadResponse").namespace("any.data").fields();  
                String fieldName = null;
@@ -108,7 +105,7 @@ public class Plc4xCommon {
                }
                
                //add timestamp tag to schema
-               
builder.name(PLC4X_RECORD_TIMESTAMP_FIELD_NAME).type().longType().noDefault();
+               builder.name(timestampFieldName).type().longType().noDefault();
                
                
                return builder.endRecord();
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkProcessorTest.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkProcessorTest.java
index 90d128a9fb..61601724a4 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkProcessorTest.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkProcessorTest.java
@@ -19,12 +19,16 @@ package org.apache.plc4x.nifi;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.apache.plc4x.nifi.address.AddressesAccessUtils;
+import org.apache.plc4x.nifi.address.FilePropertyAccessStrategy;
 import org.apache.plc4x.nifi.util.Plc4xCommonTest;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -41,7 +45,7 @@ public class Plc4xSinkProcessorTest {
         testRunner.setValidateExpressionUsage(false);
 
         testRunner.setProperty(Plc4xSinkProcessor.PLC_CONNECTION_STRING, 
"simulated://127.0.0.1");
-        
testRunner.setProperty(Plc4xSinkRecordProcessor.PLC_FUTURE_TIMEOUT_MILISECONDS, 
"1000");
+        
testRunner.setProperty(Plc4xSinkProcessor.PLC_FUTURE_TIMEOUT_MILISECONDS, 
"1000");
 
         testRunner.addConnection(Plc4xSinkProcessor.REL_SUCCESS);
         testRunner.addConnection(Plc4xSinkProcessor.REL_FAILURE);
@@ -73,4 +77,17 @@ public class Plc4xSinkProcessorTest {
         testProcessor();
     }
 
+    // Test addressess file property access strategy
+    @Test
+    public void testWithAdderessFile() throws InitializationException {
+        testRunner.setProperty(AddressesAccessUtils.ADDRESS_FILE_PROPERTY, 
"file");
+
+        try (MockedStatic<FilePropertyAccessStrategy> staticMock = 
Mockito.mockStatic(FilePropertyAccessStrategy.class)) {
+            staticMock.when(() -> 
FilePropertyAccessStrategy.extractAddressesFromFile("file"))
+                .thenReturn(Plc4xCommonTest.getAddressMap());
+
+            testProcessor();
+        }
+    }
+
 }
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessorTest.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessorTest.java
index 627ab2e80b..9f22b62bc8 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessorTest.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessorTest.java
@@ -25,9 +25,12 @@ import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.apache.plc4x.nifi.address.AddressesAccessUtils;
+import org.apache.plc4x.nifi.address.FilePropertyAccessStrategy;
 import org.apache.plc4x.nifi.util.Plc4xCommonTest;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -91,4 +94,17 @@ public class Plc4xSinkRecordProcessorTest {
                
testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY, new 
ObjectMapper().writeValueAsString(Plc4xCommonTest.getAddressMap()).toString());
                testAvroRecordReaderProcessor();
        }
+
+       // Test addressess file property access strategy
+    @Test
+    public void testWithAdderessFile() throws InitializationException {
+        testRunner.setProperty(AddressesAccessUtils.ADDRESS_FILE_PROPERTY, 
"file");
+
+        try (MockedStatic<FilePropertyAccessStrategy> staticMock = 
Mockito.mockStatic(FilePropertyAccessStrategy.class)) {
+            staticMock.when(() -> 
FilePropertyAccessStrategy.extractAddressesFromFile("file"))
+                .thenReturn(Plc4xCommonTest.getAddressMap());
+
+            testAvroRecordReaderProcessor();
+        }
+    }
 }
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java
index 331ab51eb1..a5e79ceb03 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java
@@ -21,9 +21,12 @@ package org.apache.plc4x.nifi;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.apache.plc4x.nifi.address.AddressesAccessUtils;
+import org.apache.plc4x.nifi.address.FilePropertyAccessStrategy;
 import org.apache.plc4x.nifi.util.Plc4xCommonTest;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -37,10 +40,11 @@ public class Plc4xSourceProcessorTest {
     public void init() {
         testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
         testRunner.setIncomingConnection(false);
-        testRunner.setValidateExpressionUsage(false);
+        testRunner.setValidateExpressionUsage(true);
 
-        testRunner.setProperty(Plc4xSourceProcessor.PLC_CONNECTION_STRING, 
"simulated://127.0.0.1");
-        
testRunner.setProperty(Plc4xSinkRecordProcessor.PLC_FUTURE_TIMEOUT_MILISECONDS, 
"1000");
+        testRunner.setVariable("url", "simulated://127.0.0.1");
+        testRunner.setProperty(Plc4xSourceProcessor.PLC_CONNECTION_STRING, 
"${url}");
+        
testRunner.setProperty(Plc4xSourceProcessor.PLC_FUTURE_TIMEOUT_MILISECONDS, 
"1000");
 
         testRunner.addConnection(Plc4xSourceProcessor.REL_SUCCESS);
         testRunner.addConnection(Plc4xSourceProcessor.REL_FAILURE);
@@ -69,4 +73,16 @@ public class Plc4xSourceProcessorTest {
         testProcessor();
     }
 
+    // Test addressess file property access strategy
+    @Test
+    public void testWithAdderessFile() {
+        testRunner.setProperty(AddressesAccessUtils.ADDRESS_FILE_PROPERTY, 
"file");
+
+        try (MockedStatic<FilePropertyAccessStrategy> staticMock = 
Mockito.mockStatic(FilePropertyAccessStrategy.class)) {
+            staticMock.when(() -> 
FilePropertyAccessStrategy.extractAddressesFromFile("file"))
+                .thenReturn(Plc4xCommonTest.getAddressMap());
+
+            testProcessor();
+        }
+    }
 }
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java
index 4e7c43102d..5d47d0612c 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java
@@ -23,9 +23,12 @@ import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.apache.plc4x.nifi.address.AddressesAccessUtils;
+import org.apache.plc4x.nifi.address.FilePropertyAccessStrategy;
 import org.apache.plc4x.nifi.util.Plc4xCommonTest;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -61,7 +64,7 @@ public class Plc4xSourceRecordProcessorTest {
        testRunner.assertTransferCount(Plc4xSourceRecordProcessor.REL_FAILURE, 
0);
        testRunner.assertTransferCount(Plc4xSourceRecordProcessor.REL_SUCCESS, 
NUMBER_OF_CALLS);
 
-               
Plc4xCommonTest.assertAvroContent(testRunner.getFlowFilesForRelationship(Plc4xSourceProcessor.REL_SUCCESS),
 false, true);
+               
Plc4xCommonTest.assertAvroContent(testRunner.getFlowFilesForRelationship(Plc4xSourceRecordProcessor.REL_SUCCESS),
 false, true);
     }
 
        // Test dynamic properties addressess access strategy
@@ -79,4 +82,17 @@ public class Plc4xSourceRecordProcessorTest {
         testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY, new 
ObjectMapper().writeValueAsString(Plc4xCommonTest.getAddressMap()).toString());
         testAvroRecordWriterProcessor();
     }
+
+    // Test addressess file property access strategy
+    @Test
+    public void testWithAdderessFile() throws InitializationException {
+        testRunner.setProperty(AddressesAccessUtils.ADDRESS_FILE_PROPERTY, 
"file");
+
+        try (MockedStatic<FilePropertyAccessStrategy> staticMock = 
Mockito.mockStatic(FilePropertyAccessStrategy.class)) {
+            staticMock.when(() -> 
FilePropertyAccessStrategy.extractAddressesFromFile("file"))
+                .thenReturn(Plc4xCommonTest.getAddressMap());
+
+            testAvroRecordWriterProcessor();
+        }
+    }
 }
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/address/AccessStrategyTest.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/address/AccessStrategyTest.java
new file mode 100644
index 0000000000..c244991c61
--- /dev/null
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/address/AccessStrategyTest.java
@@ -0,0 +1,190 @@
+package org.apache.plc4x.nifi.address;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.plc4x.nifi.Plc4xSourceProcessor;
+import org.apache.plc4x.nifi.util.Plc4xCommonTest;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class AccessStrategyTest {
+
+    @Mock
+    FilePropertyAccessStrategy testFileObject = new 
FilePropertyAccessStrategy();
+
+    private TestRunner testRunner; 
+
+    // Tests that addresses in dynamic properties are read correctly and 
addresses are cached if no EL is used
+    @Test
+    public void testDynamicPropertyAccessStrategy() {
+
+        DynamicPropertyAccessStrategy testObject = new 
DynamicPropertyAccessStrategy();
+        testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+        
+        assert 
testObject.getAllowableValue().equals(AddressesAccessUtils.ADDRESS_PROPERTY);
+        assert testObject.getPropertyDescriptors().isEmpty();
+        
+        Plc4xCommonTest.getAddressMap().forEach((k,v) -> 
testRunner.setProperty(k, v));
+               
+        FlowFile flowFile = testRunner.enqueue("");
+        
+        Map<String, String> values = 
testObject.extractAddresses(testRunner.getProcessContext(), flowFile);
+
+        assertTrue(testObject.getCachedAddresses().equals(values));
+        
assertTrue(testObject.getCachedAddresses().equals(Plc4xCommonTest.getAddressMap()));
+    }
+
+    // Tests incorrect address detection on dynamic properties
+    @Test
+    public void testDynamicPropertyAccessStrategyIncorrect() {
+        testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+        
+        Plc4xCommonTest.getAddressMap().forEach((k,v) -> 
testRunner.setProperty(k, "no an correct address"));
+
+        testRunner.assertNotValid();
+    }
+
+    // Tests that if EL is present in dynamic properties the processor is valid
+    @Test
+    public void testDynamicPropertyAccessStrategyELPresent() {
+        testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+        testRunner.setProperty(Plc4xSourceProcessor.PLC_CONNECTION_STRING, 
"simulated://127.0.0.1");
+        
+        Plc4xCommonTest.getAddressMap().forEach((k,v) -> 
testRunner.setProperty(k, "${attribute}"));
+
+        testRunner.assertValid();
+    }
+
+    // Tests that addresses in text property are read correctly and addresses 
are cached if no EL is used
+    @Test
+    public void testTextPropertyAccessStrategy() throws 
JsonProcessingException {
+
+        TextPropertyAccessStrategy testObject = new 
TextPropertyAccessStrategy();
+        testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+        
+        assert 
testObject.getAllowableValue().equals(AddressesAccessUtils.ADDRESS_TEXT);
+        assert 
testObject.getPropertyDescriptors().contains(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY);
+        
+        testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY, new 
ObjectMapper().writeValueAsString(Plc4xCommonTest.getAddressMap()).toString());
+               
+        FlowFile flowFile = testRunner.enqueue("");
+        
+        Map<String, String> values = 
testObject.extractAddresses(testRunner.getProcessContext(), flowFile);
+
+        assertTrue(testObject.getCachedAddresses().equals(values));
+        
assertTrue(testObject.getCachedAddresses().equals(Plc4xCommonTest.getAddressMap()));
+    }
+
+    
+
+    // Tests incorrect address detection on text property
+    @Test
+    public void testTextPropertyAccessStrategyIncorrect() {
+
+        TextPropertyAccessStrategy testObject = new 
TextPropertyAccessStrategy();
+        testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+        
+        assert 
testObject.getAllowableValue().equals(AddressesAccessUtils.ADDRESS_TEXT);
+        assert 
testObject.getPropertyDescriptors().contains(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY);
+        
+        Plc4xCommonTest.getAddressMap().forEach((k,v) -> 
testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY.getName(), 
"no an correct address"));
+
+        testRunner.assertNotValid();
+
+        Plc4xCommonTest.getAddressMap().forEach((k,v) -> 
testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY.getName(), 
"{\"neither\":\"this one\"}"));
+
+        testRunner.assertNotValid();
+    }
+
+    // Tests that if EL is present in text property the processor is valid 
+    @Test
+    public void testTextPropertyAccessStrategyELPresent() {
+
+        TextPropertyAccessStrategy testObject = new 
TextPropertyAccessStrategy();
+        testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+
+        testRunner.setProperty(Plc4xSourceProcessor.PLC_CONNECTION_STRING, 
"simulated://127.0.0.1");
+        
+        assert 
testObject.getAllowableValue().equals(AddressesAccessUtils.ADDRESS_TEXT);
+        assert 
testObject.getPropertyDescriptors().contains(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY);
+        
+        Plc4xCommonTest.getAddressMap().forEach((k,v) -> 
testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY.getName(), 
"${attribute}"));
+
+        testRunner.assertValid();
+    }
+
+    // Tests that addresses in file are read correctly and addresses are 
cached if no EL is used
+    @Test
+    public void testFilePropertyAccessStrategy() throws IOException {
+
+        testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+
+        assert 
testFileObject.getAllowableValue().equals(AddressesAccessUtils.ADDRESS_FILE);
+        assert 
testFileObject.getPropertyDescriptors().contains(AddressesAccessUtils.ADDRESS_FILE_PROPERTY);
+
+
+        testRunner.setProperty(AddressesAccessUtils.ADDRESS_FILE_PROPERTY, 
"file");
+
+        try (MockedStatic<FilePropertyAccessStrategy> staticMock = 
Mockito.mockStatic(FilePropertyAccessStrategy.class)) {
+            staticMock.when(() -> 
FilePropertyAccessStrategy.extractAddressesFromFile("file"))
+                .thenReturn(Plc4xCommonTest.getAddressMap());
+
+
+            FlowFile flowFile = testRunner.enqueue("");
+            Map<String, String> values = 
testFileObject.extractAddresses(testRunner.getProcessContext(), flowFile);
+
+            assertTrue(testFileObject.getCachedAddresses().equals(values));
+            
assertTrue(testFileObject.getCachedAddresses().equals(Plc4xCommonTest.getAddressMap()));
+        }
+    }
+
+    // Tests incorrect address detection on file
+    @Test
+    public void testFilePropertyAccessStrategyIncorrect() throws IOException {
+
+        testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+        
+        assert 
testFileObject.getAllowableValue().equals(AddressesAccessUtils.ADDRESS_FILE);
+        assert 
testFileObject.getPropertyDescriptors().contains(AddressesAccessUtils.ADDRESS_FILE_PROPERTY);
+        
+        testRunner.setProperty(AddressesAccessUtils.ADDRESS_FILE_PROPERTY, 
"file");
+
+        try (MockedStatic<FilePropertyAccessStrategy> staticMock = 
Mockito.mockStatic(FilePropertyAccessStrategy.class)) {
+            staticMock.when(() -> 
FilePropertyAccessStrategy.extractAddressesFromFile("file"))
+                .thenReturn(Map.of("not", "a correct address"));
+
+            testRunner.assertNotValid();
+        }
+    }
+
+    // Tests that if EL is present in file the processor is valid 
+    @Test
+    public void testFilePropertyAccessStrategyELPresent() throws IOException {
+
+        testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+
+        testRunner.setProperty(Plc4xSourceProcessor.PLC_CONNECTION_STRING, 
"simulated://127.0.0.1");
+        
+        assert 
testFileObject.getAllowableValue().equals(AddressesAccessUtils.ADDRESS_FILE);
+        assert 
testFileObject.getPropertyDescriptors().contains(AddressesAccessUtils.ADDRESS_FILE_PROPERTY);
+        
+        testRunner.setProperty(AddressesAccessUtils.ADDRESS_FILE_PROPERTY, 
"file");
+
+        try (MockedStatic<FilePropertyAccessStrategy> staticMock = 
Mockito.mockStatic(FilePropertyAccessStrategy.class)) {
+            staticMock.when(() -> 
FilePropertyAccessStrategy.extractAddressesFromFile("file"))
+                .thenReturn(Map.of("EL in use", "${attribute}"));
+
+            testRunner.assertValid();
+        }
+    }
+}

Reply via email to