This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 4156cc9f1e feat(integration/nifi): Various improvements for Nifi
integration
4156cc9f1e is described below
commit 4156cc9f1e65485ad7d21d3630835603727a5104
Author: Unai LerĂa Fortea <[email protected]>
AuthorDate: Mon Oct 2 09:01:11 2023 +0200
feat(integration/nifi): Various improvements for Nifi integration
* Add tag validation
* Major changes
* Rewrite processor to make them more consistent
* Split processor in smaller funcions
* Add timestamp field name
* Allow expression language in connection string
* Update readme
* Add file address access strategy
* Set a better exception attribute name in case of failure
* Update readme
* Move duplicated code to base processor
* Add testing for tag validation on properties and address text
* Add testing for file address access validation
* Clean up
* Add file address strategy test for all processors
---
plc4j/integrations/apache-nifi/README.md | 27 ++-
.../org/apache/plc4x/nifi/BasePlc4xProcessor.java | 250 +++++++++++++++++----
.../plc4x/nifi/Plc4xListenRecordProcessor.java | 157 ++++++-------
.../org/apache/plc4x/nifi/Plc4xSinkProcessor.java | 95 +++-----
.../plc4x/nifi/Plc4xSinkRecordProcessor.java | 161 ++++++-------
.../apache/plc4x/nifi/Plc4xSourceProcessor.java | 105 ++++-----
.../plc4x/nifi/Plc4xSourceRecordProcessor.java | 167 +++++++-------
.../nifi/address/AddressesAccessStrategy.java | 21 ++
.../plc4x/nifi/address/AddressesAccessUtils.java | 100 ++++++---
.../plc4x/nifi/address/BaseAccessStrategy.java | 124 ++++++++++
.../address/DynamicPropertyAccessStrategy.java | 35 ++-
.../nifi/address/FilePropertyAccessStrategy.java | 79 +++++++
.../nifi/address/TextPropertyAccessStrategy.java | 40 +++-
.../nifi/record/Plc4xReadResponseRecordSet.java | 22 +-
.../org/apache/plc4x/nifi/record/Plc4xWriter.java | 4 +-
.../plc4x/nifi/record/RecordPlc4xWriter.java | 22 +-
.../org/apache/plc4x/nifi/record/SchemaCache.java | 4 +-
.../nifi/subscription/Plc4xListenerDispatcher.java | 3 +-
.../org/apache/plc4x/nifi/util/Plc4xCommon.java | 7 +-
.../apache/plc4x/nifi/Plc4xSinkProcessorTest.java | 19 +-
.../plc4x/nifi/Plc4xSinkRecordProcessorTest.java | 16 ++
.../plc4x/nifi/Plc4xSourceProcessorTest.java | 22 +-
.../plc4x/nifi/Plc4xSourceRecordProcessorTest.java | 18 +-
.../plc4x/nifi/address/AccessStrategyTest.java | 190 ++++++++++++++++
24 files changed, 1196 insertions(+), 492 deletions(-)
diff --git a/plc4j/integrations/apache-nifi/README.md
b/plc4j/integrations/apache-nifi/README.md
index a80e6f8ed4..04aeed8654 100644
--- a/plc4j/integrations/apache-nifi/README.md
+++ b/plc4j/integrations/apache-nifi/README.md
@@ -20,8 +20,9 @@ under the License.
## Common properties
The following properties applies to all Plc4x Processors:
-* Connection String: A constant connection string such as
`s7://10.105.143.7:102?remote-rack=0&remote-slot=1&controller-type=S7_1200`.
-* Read/Write/Subscribe timeout (miliseconds): Specifies the time in
milliseconds for the connection to return a timeout. In case of subscription
the timeout is used to renew connections.
+* Connection String: A constant connection string such as
`s7://10.105.143.7:102?remote-rack=0&remote-slot=1&controller-type=S7_1200` or
a valid Expression Language ([Expression Language NiFi
documentation](https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html))
such as `${plc4x.connection_string}`.
+* Timeout (miliseconds): Specifies the time in milliseconds for the connection
to return a timeout. Is used to renew connections. Can be set with Expression
Language.
+* Timestamp field name: It defines the name of the field that represents the
time when the response from the Plc was received. It will be added to the
attributes or to the record deppending on the processor used.
* Address Access Strategy: defines how the processor obtains the PLC
addresses. It can take 2 values:
* **Properties as Addreses:**
For each variable, add a new property to the processor where the
property name matches the variable name, and the variable value corresponds to
the address tag.
@@ -48,6 +49,21 @@ The following properties applies to all Plc4x Processors:
}
```
If this JSON is in an attribute `plc4x.addresses` it can be accessed with
*Address Text*=`${plc4x.addresses}`.
+
+ * **Address File:**
+ Property *Address File* must be supplied with a path to a file in JSON
format that contains variable name and address tag. Expression Language is
supported.
+
+ For example a file in:
+ - *Address File*:```/home/nifi/s7addresses.json```
+ With the following content
+ ```json
+ {
+ "var1" : "%DB1:DBX0.0:BOOL",
+ "var2" : "%DB1:DBX0.1:BOOL"
+ }
+ ```
+ If the file name is in an attribute `plc4x.addresses_file` it can be
accessed with *Address File*=`${plc4x.addresses_file}`.
+
When reading from a PLC the response is used to create a mapping between Plc
types into Avro. The mapping is done as follows:
@@ -87,10 +103,6 @@ Table of data mapping between plc data and Avro types (as
specified in [Avro spe
Also, it is important to keep in mind the Processor Scheduling Configuration.
Using the parameter **Run Schedule** (for example to *1 sec*), the reading
frequency can be set. Note that by default, this value is defined to 0 sec (as
fast as possible).
-## Plc4xSinkProcessor
-
-## Plc4xSourceProcessor
-
## Plc4xSinkRecordProcessor
This processor is <ins>record oriented</ins>, reads from a formated input
flowfile content using a Record Reader (for further information see [NiFi
Documentation](https://nifi.apache.org/docs/nifi-docs/html/record-path-guide.html#overview)).
@@ -124,6 +136,7 @@ An *example* for reading values from a S7-1200:
- *PLC connection String:*
*s7://10.105.143.7:102?remote-rack=0&remote-slot=1&controller-type=S7_1200*
- *Record Writer:* *PLC4x Embedded - AvroRecordSetWriter*
- *Read timeout (miliseconds):* *10000*
+- *Timestamp field name:* *timestamp*
- *var1:* *%DB1:DBX0.0:BOOL*
- *var2:* *%DB1:DBX0.1:BOOL*
- *var3:* *%DB1:DBB01:BYTE*
@@ -162,6 +175,6 @@ The output flowfile will contain the PLC read values. This
information is includ
"var3" : "\u0005",
"var5" : 1992,
"var4" : "4",
- "ts" : 1628783058433
+ "timestamp" : 1628783058433
} ]
```
\ No newline at end of file
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
index a8da46c46f..b2b05635d8 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
@@ -18,36 +18,49 @@
*/
package org.apache.plc4x.nifi;
+import java.io.OutputStream;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.plc4x.java.api.PlcConnectionManager;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.plc4x.java.DefaultPlcDriverManager;
+import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.PlcDriver;
-import org.apache.plc4x.java.api.PlcDriverManager;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
+import org.apache.plc4x.java.api.model.PlcTag;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager;
import org.apache.plc4x.nifi.address.AddressesAccessStrategy;
import org.apache.plc4x.nifi.address.AddressesAccessUtils;
+import org.apache.plc4x.nifi.address.DynamicPropertyAccessStrategy;
+import org.apache.plc4x.nifi.record.Plc4xWriter;
import org.apache.plc4x.nifi.record.SchemaCache;
public abstract class BasePlc4xProcessor extends AbstractProcessor {
@@ -55,28 +68,31 @@ public abstract class BasePlc4xProcessor extends
AbstractProcessor {
protected List<PropertyDescriptor> properties;
protected Set<Relationship> relationships;
protected volatile boolean debugEnabled;
-
- protected String connectionString;
- protected Map<String, String> addressMap;
- protected Long timeout;
+ protected Integer cacheSize = 0;
protected final SchemaCache schemaCache = new SchemaCache(0);
+ protected AddressesAccessStrategy addressAccessStrategy;
- private final PlcConnectionManager connectionManager =
CachedPlcConnectionManager.getBuilder()
- .withMaxLeaseTime(Duration.ofSeconds(1000L))
- .withMaxWaitTime(Duration.ofSeconds(500L))
- .build();
+ private CachedPlcConnectionManager connectionManager;
- protected static final List<AllowableValue> addressAccessStrategy =
Collections.unmodifiableList(Arrays.asList(
- AddressesAccessUtils.ADDRESS_PROPERTY,
- AddressesAccessUtils.ADDRESS_TEXT));
+ protected CachedPlcConnectionManager getConnectionManager() {
+ return connectionManager;
+ }
+
+ protected void refreshConnectionManager() {
+ connectionManager = CachedPlcConnectionManager.getBuilder()
+ .withMaxLeaseTime(Duration.ofSeconds(1000L))
+ .withMaxWaitTime(Duration.ofSeconds(500L))
+ .build();
+ }
- protected static final PropertyDescriptor PLC_CONNECTION_STRING = new
PropertyDescriptor.Builder()
+ public static final PropertyDescriptor PLC_CONNECTION_STRING = new
PropertyDescriptor.Builder()
.name("plc4x-connection-string")
.displayName("PLC connection String")
.description("PLC4X connection string used to connect to a given PLC
device.")
.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(new Plc4xConnectionStringValidator())
.build();
@@ -86,6 +102,7 @@ public abstract class BasePlc4xProcessor extends
AbstractProcessor {
.description("Maximum number of entries in the cache. Can
improve performance when addresses change dynamically.")
.defaultValue("1")
.required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
@@ -95,9 +112,20 @@ public abstract class BasePlc4xProcessor extends
AbstractProcessor {
.description( "Request timeout in miliseconds")
.defaultValue("10000")
.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
+ public static final PropertyDescriptor PLC_TIMESTAMP_FIELD_NAME = new
PropertyDescriptor.Builder()
+ .name("plc4x-timestamp-field-name")
+ .displayName("Timestamp Field Name")
+ .description("Name of the field that will display the timestamp of the
operation.")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(new Plc4xTimestampFieldValidator())
+ .defaultValue("ts")
+ .build();
+
protected static final Relationship REL_SUCCESS = new
Relationship.Builder()
.name("success")
@@ -117,8 +145,10 @@ public abstract class BasePlc4xProcessor extends
AbstractProcessor {
properties.add(PLC_CONNECTION_STRING);
properties.add(AddressesAccessUtils.PLC_ADDRESS_ACCESS_STRATEGY);
properties.add(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY);
+ properties.add(AddressesAccessUtils.ADDRESS_FILE_PROPERTY);
properties.add(PLC_SCHEMA_CACHE_SIZE);
properties.add(PLC_FUTURE_TIMEOUT_MILISECONDS);
+ properties.add(PLC_TIMESTAMP_FIELD_NAME);
this.properties = Collections.unmodifiableList(properties);
@@ -129,12 +159,19 @@ public abstract class BasePlc4xProcessor extends
AbstractProcessor {
}
public Map<String, String> getPlcAddressMap(ProcessContext context,
FlowFile flowFile) {
- AddressesAccessStrategy strategy =
AddressesAccessUtils.getAccessStrategy(context);
- return strategy.extractAddresses(context, flowFile);
+ return addressAccessStrategy.extractAddresses(context, flowFile);
}
- public String getConnectionString() {
- return connectionString;
+ public String getConnectionString(ProcessContext context, FlowFile
flowFile) {
+ return
context.getProperty(PLC_CONNECTION_STRING).evaluateAttributeExpressions(flowFile).getValue();
+ }
+
+ public Long getTimeout(ProcessContext context, FlowFile flowFile) {
+ return
context.getProperty(PLC_FUTURE_TIMEOUT_MILISECONDS).evaluateAttributeExpressions(flowFile).asLong();
+ }
+
+ public String getTimestampField(ProcessContext context) {
+ return
context.getProperty(PLC_TIMESTAMP_FIELD_NAME).evaluateAttributeExpressions().getValue();
}
public SchemaCache getSchemaCache() {
@@ -151,14 +188,14 @@ public abstract class BasePlc4xProcessor extends
AbstractProcessor {
return properties;
}
- //dynamic prop
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final
String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
.dependsOn(AddressesAccessUtils.PLC_ADDRESS_ACCESS_STRATEGY,
AddressesAccessUtils.ADDRESS_PROPERTY)
+ .addValidator(new
DynamicPropertyAccessStrategy.TagValidator(AddressesAccessUtils.getManager()))
.required(false)
.dynamic(true)
.build();
@@ -167,10 +204,14 @@ public abstract class BasePlc4xProcessor extends
AbstractProcessor {
@OnScheduled
public void onScheduled(final ProcessContext context) {
- connectionString =
context.getProperty(PLC_CONNECTION_STRING.getName()).getValue();
-
schemaCache.restartCache(context.getProperty(PLC_SCHEMA_CACHE_SIZE).asInteger());
+ Integer newCacheSize =
context.getProperty(PLC_SCHEMA_CACHE_SIZE).evaluateAttributeExpressions().asInteger();
+ if (!newCacheSize.equals(cacheSize)){
+ schemaCache.restartCache(newCacheSize);
+ cacheSize = newCacheSize;
+ }
+ refreshConnectionManager();
debugEnabled = getLogger().isDebugEnabled();
- timeout =
context.getProperty(PLC_FUTURE_TIMEOUT_MILISECONDS.getName()).asLong();
+ addressAccessStrategy =
AddressesAccessUtils.getAccessStrategy(context);
}
@Override
@@ -186,37 +227,162 @@ public abstract class BasePlc4xProcessor extends
AbstractProcessor {
}
BasePlc4xProcessor that = (BasePlc4xProcessor) o;
return Objects.equals(properties, that.properties) &&
- Objects.equals(getRelationships(), that.getRelationships()) &&
- Objects.equals(getConnectionString(), that.getConnectionString());
+ Objects.equals(getRelationships(), that.getRelationships());
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), properties, getRelationships(),
getConnectionString());
+ return Objects.hash(super.hashCode(), properties, getRelationships());
}
- public static class Plc4xConnectionStringValidator implements Validator {
- @Override
- public ValidationResult validate(String subject, String input,
ValidationContext context) {
- // TODO: Add validation here ...
- return new
ValidationResult.Builder().subject(subject).explanation("").valid(true).build();
+ protected PlcWriteRequest getWriteRequest(final ComponentLog logger,
+ final Map<String, String> addressMap, final Map<String, PlcTag>
tags, final Map<String, ? extends Object> presentTags,
+ final PlcConnection connection, final AtomicLong nrOfRowsHere) {
+
+ PlcWriteRequest.Builder builder = connection.writeRequestBuilder();
+
+ if (tags != null){
+ for (Map.Entry<String,PlcTag> tag : tags.entrySet()){
+ if (presentTags.containsKey(tag.getKey())) {
+ builder.addTag(tag.getKey(), tag.getValue(),
presentTags.get(tag.getKey()));
+ if (nrOfRowsHere != null) {
+ nrOfRowsHere.incrementAndGet();
+ }
+ } else {
+ if (debugEnabled)
+ logger.debug("PlcTag " + tag + " is declared as
address but was not found on input record.");
+ }
+ }
+ } else {
+ if (debugEnabled)
+ logger.debug("PlcTypes resolution not found in cache and will
be added with key: " + addressMap);
+ for (Map.Entry<String,String> entry: addressMap.entrySet()){
+ if (presentTags.containsKey(entry.getKey())) {
+ builder.addTagAddress(entry.getKey(), entry.getValue(),
presentTags.get(entry.getKey()));
+ if (nrOfRowsHere != null) {
+ nrOfRowsHere.incrementAndGet();
+ }
+ }
+ }
+ }
+
+ return builder.build();
+ }
+
+ protected PlcReadRequest getReadRequest(final ComponentLog logger,
+ final Map<String, String> addressMap, final Map<String, PlcTag>
tags,
+ final PlcConnection connection) {
+
+ PlcReadRequest.Builder builder = connection.readRequestBuilder();
+
+ if (tags != null){
+ for (Map.Entry<String,PlcTag> tag : tags.entrySet()){
+ builder.addTag(tag.getKey(), tag.getValue());
+ }
+ } else {
+ if (debugEnabled)
+ logger.debug("Plc-Avro schema and PlcTypes resolution not
found in cache and will be added with key: " + addressMap);
+ for (Map.Entry<String,String> entry: addressMap.entrySet()){
+ builder.addTagAddress(entry.getKey(), entry.getValue());
+ }
+ }
+ return builder.build();
+ }
+
+ protected void evaluateWriteResponse(final ComponentLog logger,
Map<String, ? extends Object> values, PlcWriteResponse plcWriteResponse) {
+
+ boolean codeErrorPresent = false;
+ List<String> tagsAtError = null;
+
+ PlcResponseCode code = null;
+
+ for (String tag : plcWriteResponse.getTagNames()) {
+ code = plcWriteResponse.getResponseCode(tag);
+ if (!code.equals(PlcResponseCode.OK)) {
+ if (tagsAtError == null) {
+ tagsAtError = new ArrayList<>();
+ }
+ logger.error("Not OK code when writing the data
to PLC for tag " + tag
+ + " with value " +
values.get(tag).toString()
+ + " in addresss " +
plcWriteResponse.getTag(tag).getAddressString());
+
+ codeErrorPresent = true;
+ tagsAtError.add(tag);
+
+ }
+ }
+ if (codeErrorPresent) {
+ throw new ProcessException("At least one error was
found when while writting tags: " + tagsAtError.toString());
+ }
+ }
+
+ protected void evaluateReadResponse(final ProcessSession session, final
FlowFile flowFile, final PlcReadResponse response) {
+ Map<String, String> attributes = new HashMap<>();
+ for (String tagName : response.getTagNames()) {
+ for (int i = 0; i < response.getNumberOfValues(tagName); i++) {
+ Object value = response.getObject(tagName, i);
+ attributes.put(tagName, String.valueOf(value));
+ }
}
+ session.putAllAttributes(flowFile, attributes);
}
- public static class Plc4xAddressStringValidator implements Validator {
+ protected long evaluateReadResponse(final ProcessContext context, final
ComponentLog logger, final FlowFile originalFlowFile,
+ Plc4xWriter plc4xWriter, OutputStream out, final
RecordSchema recordSchema, PlcReadResponse readResponse)
+ throws Exception {
+
+ if(originalFlowFile == null) //there is no inherit attributes
to use in writer service
+ return plc4xWriter.writePlcReadResponse(readResponse,
out, logger, null, recordSchema, getTimestampField(context));
+ else
+ return plc4xWriter.writePlcReadResponse(readResponse,
out, logger, null, recordSchema, originalFlowFile, getTimestampField(context));
+ }
+
+ protected static class Plc4xConnectionStringValidator implements Validator
{
@Override
public ValidationResult validate(String subject, String input,
ValidationContext context) {
- // TODO: Add validation here ...
- return new
ValidationResult.Builder().subject(subject).explanation("").valid(true).build();
+ DefaultPlcDriverManager manager = new DefaultPlcDriverManager();
+
+ if (context.isExpressionLanguageSupported(subject) &&
context.isExpressionLanguagePresent(input)) {
+ return new
ValidationResult.Builder().subject(subject).input(input).explanation("Expression
Language Present").valid(true).build();
+ }
+ try {
+ PlcDriver driver = manager.getDriverForUrl(input);
+ driver.getConnection(input);
+ } catch (PlcConnectionException e) {
+ return new ValidationResult.Builder().subject(subject)
+ .explanation(e.getMessage())
+ .valid(false)
+ .build();
+ }
+ return new ValidationResult.Builder().subject(subject)
+ .explanation("")
+ .valid(true)
+ .build();
}
}
- protected PlcConnectionManager getConnectionManager() {
- return connectionManager;
- }
+ protected static class Plc4xTimestampFieldValidator implements Validator {
+ @Override
+ public ValidationResult validate(String subject, String input,
ValidationContext context) {
- protected PlcDriver getDriver() throws PlcConnectionException {
- return PlcDriverManager.getDefault().getDriverForUrl(connectionString);
- }
+ if (context.isExpressionLanguageSupported(subject) &&
context.isExpressionLanguagePresent(input)) {
+ return new
ValidationResult.Builder().subject(subject).input(input).explanation("Expression
Language Present").valid(true).build();
+ }
+
+ Map<String, String> allProperties = context.getAllProperties();
+ allProperties.remove(subject);
+
+ if (allProperties.containsValue(input)) {
+ return new ValidationResult.Builder().subject(subject)
+ .explanation("Timestamp field must be unique")
+ .valid(false)
+ .build();
+ }
+ return new ValidationResult.Builder().subject(subject)
+ .explanation("")
+ .valid(true)
+ .build();
+ }
+ }
}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xListenRecordProcessor.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xListenRecordProcessor.java
index 0664fa0015..4b6e47b719 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xListenRecordProcessor.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xListenRecordProcessor.java
@@ -30,7 +30,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -85,6 +84,7 @@ public class Plc4xListenRecordProcessor extends
BasePlc4xProcessor {
protected Plc4xListenerDispatcher dispatcher;
protected RecordSchema recordSchema;
protected Thread readerThread;
+ protected Map<String, String> addressMap;
final StopWatch executeTime = new StopWatch(false);
public static final PropertyDescriptor PLC_RECORD_WRITER_FACTORY = new
PropertyDescriptor.Builder()
@@ -112,21 +112,7 @@ public class Plc4xListenRecordProcessor extends
BasePlc4xProcessor {
.dependsOn(PLC_SUBSCRIPTION_TYPE,
Plc4xSubscriptionType.CYCLIC.name())
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .addValidator(new Validator() {
- @Override
- public ValidationResult validate(String subject, String
input, ValidationContext context) {
- if
(context.getProperty(PLC_FUTURE_TIMEOUT_MILISECONDS).asLong() >
Long.valueOf(input)) {
- return new
ValidationResult.Builder().valid(true).build();
- } else {
- return new ValidationResult.Builder()
- .valid(false)
- .input(input)
-
.subject(PLC_SUBSCRIPTION_CYCLIC_POLLING_INTERVAL.getDisplayName())
- .explanation(String.format("it must me
smaller than the value of %s", PLC_FUTURE_TIMEOUT_MILISECONDS.getDisplayName()))
- .build();
- }
- }
- })
+ .addValidator(new CyclycPollingIntervalValidator())
.defaultValue("10000")
.build();
@@ -151,20 +137,19 @@ public class Plc4xListenRecordProcessor extends
BasePlc4xProcessor {
super.onScheduled(context);
subscriptionType =
Plc4xSubscriptionType.valueOf(context.getProperty(PLC_SUBSCRIPTION_TYPE).getValue());
cyclingPollingInterval =
context.getProperty(PLC_SUBSCRIPTION_CYCLIC_POLLING_INTERVAL).asLong();
- addressMap = getPlcAddressMap(context, null);
-
- createDispatcher(events);
+ createDispatcher(context, events);
}
- protected void createDispatcher(final BlockingQueue<PlcSubscriptionEvent>
events) {
+ protected void createDispatcher(final ProcessContext context, final
BlockingQueue<PlcSubscriptionEvent> events) {
if (readerThread != null) {
return;
}
// create the dispatcher and calls open() to start listening to
the plc subscription
- dispatcher = new Plc4xListenerDispatcher(timeout, subscriptionType,
cyclingPollingInterval, getLogger(), events);
+ dispatcher = new Plc4xListenerDispatcher(getTimeout(context, null),
subscriptionType, cyclingPollingInterval, getLogger(), events);
try {
- dispatcher.open(getConnectionString(), addressMap);
+ addressMap = getPlcAddressMap(context, null);
+ dispatcher.open(getConnectionString(context, null),
addressMap);
} catch (Exception e) {
if (debugEnabled) {
getLogger().debug("Error creating a the
subscription event dispatcher");
@@ -198,7 +183,7 @@ public class Plc4xListenRecordProcessor extends
BasePlc4xProcessor {
}
}
- protected PlcSubscriptionEvent getMessage() {
+ protected PlcSubscriptionEvent getMessage(final ProcessContext context)
{
if (readerThread != null && readerThread.isAlive()) {
return events.poll();
@@ -208,14 +193,14 @@ public class Plc4xListenRecordProcessor extends
BasePlc4xProcessor {
getLogger().debug("Connection to Plc broke. Trying to
restart connection");
}
closeDispatcher();
- createDispatcher(events);
+ createDispatcher(context, events);
throw new ProcessException("Connection to Plc broke. Trying to
restart connection");
}
@Override
public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
- DefaultPlcSubscriptionEvent event =
(DefaultPlcSubscriptionEvent) getMessage();
+ DefaultPlcSubscriptionEvent event =
(DefaultPlcSubscriptionEvent) getMessage(context);
if (event == null) {
return;
@@ -226,70 +211,25 @@ public class Plc4xListenRecordProcessor extends
BasePlc4xProcessor {
final AtomicLong nrOfRows = new AtomicLong(0L);
- FlowFile resultSetFF;
- resultSetFF = session.create();
+ FlowFile resultSetFF = session.create();
Plc4xWriter plc4xWriter = new
RecordPlc4xWriter(context.getProperty(PLC_RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class),
Collections.emptyMap());
try {
- resultSetFF = session.write(resultSetFF, out -> {
+ session.write(resultSetFF, out -> {
try {
-
nrOfRows.set(plc4xWriter.writePlcReadResponse(event, out, getLogger(), null,
recordSchema));
+
nrOfRows.set(plc4xWriter.writePlcReadResponse(event, out, getLogger(), null,
recordSchema, getTimestampField(context)));
} catch (Exception e) {
getLogger().error("Exception reading
the data from PLC", e);
throw (e instanceof ProcessException) ?
(ProcessException) e : new ProcessException(e);
}
if (recordSchema == null){
- if (debugEnabled)
- getLogger().debug("Adding
Plc-Avro schema and PlcTypes resolution into cache with key: " +
addressMap.toString());
-
- // Add schema to the cache
- LinkedHashSet<String> addressNames =
new LinkedHashSet<String>();
-
addressNames.addAll(event.getTagNames());
-
- List<PlcTag> addressTags =
addressNames.stream().map(
- new Function<String,PlcTag>() {
- @Override
- public PlcTag
apply(String addr) {
- return new
PlcTag() {
-
@Override
- public
String getAddressString() {
-
return addr;
- }
-
-
@Override
- public
PlcValueType getPlcValueType() {
-
return event.getPlcValue(addr).getPlcValueType();
- }
- };
- }
-
}).collect(Collectors.toList());
-
- getSchemaCache().addSchema(
- addressMap,
- addressNames,
- addressTags,
- plc4xWriter.getRecordSchema()
- );
- recordSchema =
getSchemaCache().retrieveSchema(addressMap);
+ addTagsToCache(event, plc4xWriter);
}
});
- long executionTimeElapsed =
executeTime.getElapsed(TimeUnit.MILLISECONDS);
- executeTime.stop();
-
- final Map<String, String> attributesToAdd = new
HashMap<>();
- attributesToAdd.put(RESULT_ROW_COUNT,
String.valueOf(nrOfRows.get()));
- attributesToAdd.put(RESULT_LAST_EVENT,
String.valueOf(executionTimeElapsed));
-
-
attributesToAdd.putAll(plc4xWriter.getAttributesToAdd());
- resultSetFF = session.putAllAttributes(resultSetFF,
attributesToAdd);
- plc4xWriter.updateCounters(session);
- getLogger().info("{} contains {} records; transferring
to 'success'", resultSetFF, nrOfRows.get());
-
- session.getProvenanceReporter().receive(resultSetFF,
"Retrieved " + nrOfRows.get() + " rows from subscription",
executionTimeElapsed);
+ resultSetFF = completeResultFlowFile(session, nrOfRows,
resultSetFF, plc4xWriter);
session.transfer(resultSetFF, REL_SUCCESS);
- session.commitAsync();
executeTime.start();
@@ -298,4 +238,71 @@ public class Plc4xListenRecordProcessor extends
BasePlc4xProcessor {
throw new ProcessException("Got an error while trying
to get a subscription event", e);
}
}
+
+ private void addTagsToCache(DefaultPlcSubscriptionEvent event,
Plc4xWriter plc4xWriter) {
+ if (debugEnabled)
+ getLogger().debug("Adding Plc-Avro schema and PlcTypes
resolution into cache with key: " + addressMap.toString());
+
+ // Add schema to the cache
+ LinkedHashSet<String> addressNames = new LinkedHashSet<>();
+ addressNames.addAll(event.getTagNames());
+
+ List<PlcTag> addressTags = addressNames.stream().map(addr ->
+ new PlcTag() {
+ @Override
+ public String getAddressString() {
+ return addr;
+ }
+
+ @Override
+ public PlcValueType getPlcValueType() {
+ return
event.getPlcValue(addr).getPlcValueType();
+ }
+ }
+ ).collect(Collectors.toList());
+
+ getSchemaCache().addSchema(
+ addressMap,
+ addressNames,
+ addressTags,
+ plc4xWriter.getRecordSchema()
+ );
+ recordSchema = getSchemaCache().retrieveSchema(addressMap);
+ }
+
+ private FlowFile completeResultFlowFile(final ProcessSession session,
final AtomicLong nrOfRows, FlowFile resultSetFF,
+ Plc4xWriter plc4xWriter) {
+
+ long executionTimeElapsed =
executeTime.getElapsed(TimeUnit.MILLISECONDS);
+ executeTime.stop();
+
+ final Map<String, String> attributesToAdd = new HashMap<>();
+ attributesToAdd.put(RESULT_ROW_COUNT,
String.valueOf(nrOfRows.get()));
+ attributesToAdd.put(RESULT_LAST_EVENT,
String.valueOf(executionTimeElapsed));
+
+ attributesToAdd.putAll(plc4xWriter.getAttributesToAdd());
+ resultSetFF = session.putAllAttributes(resultSetFF,
attributesToAdd);
+ plc4xWriter.updateCounters(session);
+ getLogger().info("{} contains {} records; transferring to
'success'", resultSetFF, nrOfRows.get());
+
+ session.getProvenanceReporter().receive(resultSetFF, "Retrieved
" + nrOfRows.get() + " rows from subscription", executionTimeElapsed);
+ return resultSetFF;
+ }
+
+
+ protected static class CyclycPollingIntervalValidator implements
Validator {
+ @Override
+ public ValidationResult validate(String subject, String input,
ValidationContext context) {
+ if
(context.getProperty(PLC_FUTURE_TIMEOUT_MILISECONDS).asLong() >
Long.valueOf(input)) {
+ return new
ValidationResult.Builder().valid(true).build();
+ } else {
+ return new ValidationResult.Builder()
+ .valid(false)
+ .input(input)
+
.subject(PLC_SUBSCRIPTION_CYCLIC_POLLING_INTERVAL.getDisplayName())
+ .explanation(String.format("it must me smaller
than the value of %s", PLC_FUTURE_TIMEOUT_MILISECONDS.getDisplayName()))
+ .build();
+ }
+ }
+ }
}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
index 9c1cb758e7..f18b35e893 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
@@ -20,6 +20,7 @@ package org.apache.plc4x.nifi;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
@@ -36,7 +37,6 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
import org.apache.plc4x.java.api.messages.PlcWriteResponse;
-import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.api.model.PlcTag;
@TriggerSerially
@@ -47,83 +47,62 @@ import org.apache.plc4x.java.api.model.PlcTag;
@ReadsAttributes({@ReadsAttribute(attribute="value", description="some
value")})
public class Plc4xSinkProcessor extends BasePlc4xProcessor {
+ public static final String EXCEPTION = "plc4x.write.exception";
+
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
FlowFile flowFile = session.get();
- final ComponentLog logger = getLogger();
-
+
// Abort if there's nothing to do.
if (flowFile == null) {
return;
}
- // Get an instance of a component able to write to a PLC.
- try(PlcConnection connection =
getConnectionManager().getConnection(getConnectionString())) {
+ final ComponentLog logger = getLogger();
+
+ try(PlcConnection connection =
getConnectionManager().getConnection(getConnectionString(context, flowFile))) {
if (!connection.getMetadata().canWrite()) {
throw new ProcessException("Writing not supported by
connection");
}
-
- // Prepare the request.
- PlcWriteRequest.Builder builder = connection.writeRequestBuilder();
- Map<String,String> addressMap = getPlcAddressMap(context,
flowFile);
+
+ final Map<String,String> addressMap = getPlcAddressMap(context,
flowFile);
final Map<String, PlcTag> tags =
getSchemaCache().retrieveTags(addressMap);
- if (tags != null){
- for (Map.Entry<String,PlcTag> tag : tags.entrySet()){
- if (flowFile.getAttributes().containsKey(tag.getKey())) {
- builder.addTag(tag.getKey(), tag.getValue(),
flowFile.getAttribute(tag.getKey()));
- } else {
- if (debugEnabled)
- logger.debug("PlcTag " + tag + " is declared as
address but was not found on input record.");
- }
- }
- } else {
- for (Map.Entry<String,String> entry: addressMap.entrySet()){
- if (flowFile.getAttributes().containsKey(entry.getKey())) {
- builder.addTagAddress(entry.getKey(),
entry.getValue(), flowFile.getAttribute(entry.getKey()));
- }
- }
- if (debugEnabled)
- logger.debug("PlcTypes resolution not found in cache and
will be added with key: " + addressMap);
- }
-
- PlcWriteRequest writeRequest = builder.build();
+ PlcWriteRequest writeRequest = getWriteRequest(logger, addressMap,
tags, flowFile.getAttributes(), connection, null);
- // Send the request to the PLC.
try {
- final PlcWriteResponse plcWriteResponse =
writeRequest.execute().get(this.timeout, TimeUnit.MILLISECONDS);
- PlcResponseCode code = null;
-
- for (String tag : plcWriteResponse.getTagNames()) {
- code = plcWriteResponse.getResponseCode(tag);
- if (!code.equals(PlcResponseCode.OK)) {
- logger.error("Not OK code when writing the data to PLC
for tag " + tag
- + " with value
" + flowFile.getAttribute(tag)
- + " in addresss
" + plcWriteResponse.getTag(tag).getAddressString());
- throw new Exception(code.toString());
- }
- }
- session.transfer(flowFile, REL_SUCCESS);
+ final PlcWriteResponse plcWriteResponse =
writeRequest.execute().get(getTimeout(context, flowFile),
TimeUnit.MILLISECONDS);
- if (tags == null){
- if (debugEnabled)
- logger.debug("Adding PlcTypes resolution into cache
with key: " + addressMap);
- getSchemaCache().addSchema(
- addressMap,
- writeRequest.getTagNames(),
- writeRequest.getTags(),
- null
- );
- }
+ evaluateWriteResponse(logger, flowFile.getAttributes(),
plcWriteResponse);
+
+ } catch (TimeoutException e) {
+ logger.error("Timeout writting the data to the PLC", e);
+
getConnectionManager().removeCachedConnection(getConnectionString(context,
flowFile));
+ throw new ProcessException(e);
} catch (Exception e) {
- flowFile = session.putAttribute(flowFile, "exception",
e.getLocalizedMessage());
- session.transfer(flowFile, REL_FAILURE);
+ logger.error("Exception writting the data to the PLC", e);
+ throw (e instanceof ProcessException) ? (ProcessException) e :
new ProcessException(e);
}
- } catch (ProcessException e) {
- throw e;
+ session.transfer(flowFile, REL_SUCCESS);
+
+ if (tags == null){
+ if (debugEnabled)
+ logger.debug("Adding PlcTypes resolution into cache with
key: " + addressMap);
+ getSchemaCache().addSchema(
+ addressMap,
+ writeRequest.getTagNames(),
+ writeRequest.getTags(),
+ null
+ );
+ }
+
+
} catch (Exception e) {
- throw new ProcessException("Got an error while trying to get a
connection", e);
+ flowFile = session.putAttribute(flowFile, EXCEPTION,
e.getLocalizedMessage());
+ session.transfer(flowFile, REL_FAILURE);
+ session.commitAsync();
+ throw (e instanceof ProcessException) ? (ProcessException) e : new
ProcessException(e);
}
}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessor.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessor.java
index 7f58803b45..661f919266 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessor.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessor.java
@@ -18,7 +18,6 @@
*/
package org.apache.plc4x.nifi;
-import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -27,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -52,10 +52,10 @@ import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.StopWatch;
import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
import org.apache.plc4x.java.api.messages.PlcWriteResponse;
import org.apache.plc4x.java.api.model.PlcTag;
-import org.apache.plc4x.java.api.types.PlcResponseCode;
@TriggerSerially
@Tags({"plc4x", "put", "sink", "record"})
@@ -73,6 +73,7 @@ public class Plc4xSinkRecordProcessor extends
BasePlc4xProcessor {
public static final String RESULT_ROW_COUNT = "plc4x.write.row.count";
public static final String RESULT_QUERY_EXECUTION_TIME =
"plc4x.write.query.executiontime";
public static final String INPUT_FLOWFILE_UUID = "input.flowfile.uuid";
+ public static final String EXCEPTION = "plc4x.write.exception";
public static final PropertyDescriptor PLC_RECORD_READER_FACTORY = new
PropertyDescriptor.Builder()
.name("record-reader").displayName("Record Reader")
@@ -105,116 +106,90 @@ public class Plc4xSinkRecordProcessor extends
BasePlc4xProcessor {
}
final ComponentLog logger = getLogger();
+
// Get an instance of a component able to read from a PLC.
final AtomicLong nrOfRows = new AtomicLong(0L);
final StopWatch executeTime = new StopWatch(true);
- final FlowFile originalFlowFile = fileToProcess;
-
- InputStream in = session.read(originalFlowFile);
-
- Record record = null;
-
- try (RecordReader recordReader =
context.getProperty(PLC_RECORD_READER_FACTORY)
- .asControllerService(RecordReaderFactory.class)
- .createRecordReader(originalFlowFile, in, logger)){
-
- while ((record = recordReader.nextRecord()) != null) {
- long nrOfRowsHere = 0L;
- PlcWriteResponse plcWriteResponse;
- PlcWriteRequest writeRequest;
-
- Map<String,String> addressMap =
getPlcAddressMap(context, fileToProcess);
- final Map<String, PlcTag> tags =
getSchemaCache().retrieveTags(addressMap);
-
- try (PlcConnection connection =
getConnectionManager().getConnection(getConnectionString())) {
- PlcWriteRequest.Builder builder =
connection.writeRequestBuilder();
-
-
- if (tags != null){
- for (Map.Entry<String,PlcTag>
tag : tags.entrySet()){
- if
(record.toMap().containsKey(tag.getKey())) {
-
builder.addTag(tag.getKey(), tag.getValue(), record.getValue(tag.getKey()));
- nrOfRowsHere++;
- } else {
- if
(debugEnabled)
- logger.debug("PlcTag " + tag +
" is declared as address but was not found on input record.");
- }
+ try {
+ session.read(fileToProcess, in -> {
+ Record record = null;
+
+ try (RecordReader recordReader =
context.getProperty(PLC_RECORD_READER_FACTORY)
+
.asControllerService(RecordReaderFactory.class)
+ .createRecordReader(fileToProcess, in,
logger)){
+
+ while ((record =
recordReader.nextRecord()) != null) {
+ AtomicLong nrOfRowsHere = new
AtomicLong(0);
+ PlcWriteRequest writeRequest;
+
+ final Map<String,String>
addressMap = getPlcAddressMap(context, fileToProcess);
+ final Map<String, PlcTag> tags
= getSchemaCache().retrieveTags(addressMap);
+
+ try (PlcConnection connection =
getConnectionManager().getConnection(getConnectionString(context,
fileToProcess))) {
+
+ writeRequest =
getWriteRequest(logger, addressMap, tags, record.toMap(), connection,
nrOfRowsHere);
+
+ PlcWriteResponse
plcWriteResponse = writeRequest.execute().get(getTimeout(context,
fileToProcess), TimeUnit.MILLISECONDS);
+
+ // Response check if
values were written
+
evaluateWriteResponse(logger, record.toMap(), plcWriteResponse);
+
+ } catch (TimeoutException e) {
+ logger.error("Timeout
writting the data to the PLC", e);
+
getConnectionManager().removeCachedConnection(getConnectionString(context,
fileToProcess));
+ throw new
ProcessException(e);
+ } catch (PlcConnectionException
e) {
+ logger.error("Error
getting the PLC connection", e);
+ throw new
ProcessException("Got an a PlcConnectionException while trying to get a
connection", e);
+ } catch (Exception e) {
+ logger.error("Exception
writting the data to the PLC", e);
+ throw (e instanceof
ProcessException) ? (ProcessException) e : new ProcessException(e);
}
- } else {
- if (debugEnabled)
- logger.debug("Plc-Avro schema and
PlcTypes resolution not found in cache and will be added with key: " +
addressMap);
- for (Map.Entry<String,String>
entry: addressMap.entrySet()){
- if
(record.toMap().containsKey(entry.getKey())) {
-
builder.addTagAddress(entry.getKey(), entry.getValue(),
record.getValue(entry.getKey()));
- nrOfRowsHere++;
- } else {
- if
(debugEnabled)
- logger.debug("PlcTag " +
entry.getKey() + " with address " + entry.getValue() + " was not found on input
record.");
- }
+
+ if (tags == null){
+ if (debugEnabled)
+
logger.debug("Adding PlcTypes resolution into cache with key: " + addressMap);
+
getSchemaCache().addSchema(
+ addressMap,
+
writeRequest.getTagNames(),
+
writeRequest.getTags(),
+ null
+ );
}
- }
- writeRequest = builder.build();
-
- plcWriteResponse =
writeRequest.execute().get(this.timeout, TimeUnit.MILLISECONDS);
+
nrOfRows.getAndAdd(nrOfRowsHere.get());
+
+ }
} catch (Exception e) {
- in.close();
- logger.error("Exception writing the
data to PLC", e);
- session.transfer(originalFlowFile,
REL_FAILURE);
- session.commitAsync();
throw (e instanceof ProcessException) ?
(ProcessException) e : new ProcessException(e);
- }
-
- // Response check if values were written
- if (plcWriteResponse != null){
- PlcResponseCode code = null;
-
- for (String tag :
plcWriteResponse.getTagNames()) {
- code =
plcWriteResponse.getResponseCode(tag);
- if
(!code.equals(PlcResponseCode.OK)) {
- logger.error("Not OK
code when writing the data to PLC for tag " + tag
- + " with value
" + record.getValue(tag).toString()
- + " in addresss
" + plcWriteResponse.getTag(tag).getAddressString());
- throw new
ProcessException("Writing response code for " +
plcWriteResponse.getTag(tag).getAddressString() + "was " + code.name() + ",
expected OK");
- }
- }
- if (tags == null && writeRequest !=
null){
- if (debugEnabled)
- logger.debug("Adding
PlcTypes resolution into cache with key: " + addressMap);
- getSchemaCache().addSchema(
- addressMap,
-
writeRequest.getTagNames(),
- writeRequest.getTags(),
- null
- );
- }
- nrOfRows.getAndAdd(nrOfRowsHere);
- }
- }
- in.close();
- } catch (Exception e) {
- throw new ProcessException(e);
+ }
+ });
+
+ } catch (ProcessException e) {
+ logger.error("Exception writing the data to the PLC",
e);
+ session.putAttribute(fileToProcess, EXCEPTION,
e.getLocalizedMessage());
+ session.transfer(fileToProcess, REL_FAILURE);
+ session.commitAsync();
+ throw e;
}
-
+
+
long executionTimeElapsed =
executeTime.getElapsed(TimeUnit.MILLISECONDS);
final Map<String, String> attributesToAdd = new HashMap<>();
attributesToAdd.put(RESULT_ROW_COUNT,
String.valueOf(nrOfRows.get()));
attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME,
String.valueOf(executionTimeElapsed));
attributesToAdd.put(INPUT_FLOWFILE_UUID,
fileToProcess.getAttribute(CoreAttributes.UUID.key()));
- FlowFile resultSetFF =
session.putAllAttributes(originalFlowFile, attributesToAdd);
+ session.putAllAttributes(fileToProcess, attributesToAdd);
- logger.info("Writing {} fields from {} records; transferring to
'success'", new Object[] { nrOfRows.get(), resultSetFF });
- // Report a FETCH event if there was an incoming flow file, or
a RECEIVE event
- // otherwise
+ session.transfer(fileToProcess, REL_SUCCESS);
+
+ logger.info("Writing {} fields from {} records; transferring to
'success'", nrOfRows.get(), fileToProcess);
if (context.hasIncomingConnection()) {
- session.getProvenanceReporter().fetch(resultSetFF,
"Writted " + nrOfRows.get() + " rows", executionTimeElapsed);
+ session.getProvenanceReporter().fetch(fileToProcess,
"Writted " + nrOfRows.get() + " rows", executionTimeElapsed);
} else {
- session.getProvenanceReporter().receive(resultSetFF,
"Writted " + nrOfRows.get() + " rows", executionTimeElapsed);
+ session.getProvenanceReporter().receive(fileToProcess,
"Writted " + nrOfRows.get() + " rows", executionTimeElapsed);
}
-
- session.transfer(resultSetFF, BasePlc4xProcessor.REL_SUCCESS);
- session.commitAsync();
}
}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
index 0f57de46d0..a6f2da2e17 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
@@ -18,10 +18,9 @@
*/
package org.apache.plc4x.nifi;
-import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -46,70 +45,74 @@ import org.apache.plc4x.java.api.model.PlcTag;
@WritesAttributes({@WritesAttribute(attribute="value", description="some
value")})
public class Plc4xSourceProcessor extends BasePlc4xProcessor {
+ public static final String EXCEPTION = "plc4x.read.exception";
+
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+
+ FlowFile incomingFlowFile = null;
+ if (context.hasIncomingConnection()) {
+ incomingFlowFile = session.get();
+ if (incomingFlowFile == null && context.hasNonLoopConnection()) {
+ return;
+ }
+ }
final ComponentLog logger = getLogger();
- // Get an instance of a component able to read from a PLC.
- try(PlcConnection connection =
getConnectionManager().getConnection(getConnectionString())) {
+ final FlowFile flowFile = session.create();
+
+ try(PlcConnection connection =
getConnectionManager().getConnection(getConnectionString(context,
incomingFlowFile))) {
- // Prepare the request.
if (!connection.getMetadata().canRead()) {
- throw new ProcessException("Writing not supported by
connection");
+ throw new ProcessException("Reading not supported by
connection");
}
- FlowFile flowFile = session.create();
- try {
- PlcReadRequest.Builder builder =
connection.readRequestBuilder();
- Map<String,String> addressMap = getPlcAddressMap(context,
flowFile);
- final Map<String, PlcTag> tags =
getSchemaCache().retrieveTags(addressMap);
+ final Map<String,String> addressMap = getPlcAddressMap(context,
incomingFlowFile);
+ final Map<String, PlcTag> tags =
getSchemaCache().retrieveTags(addressMap);
- if (tags != null){
- for (Map.Entry<String,PlcTag> tag : tags.entrySet()){
- builder.addTag(tag.getKey(), tag.getValue());
- }
- } else {
- if (debugEnabled)
- logger.debug("PlcTypes resolution not found in cache
and will be added with key: " + addressMap);
- for (Map.Entry<String,String> entry:
addressMap.entrySet()){
- builder.addTagAddress(entry.getKey(),
entry.getValue());
- }
- }
- PlcReadRequest readRequest = builder.build();
- PlcReadResponse response =
readRequest.execute().get(this.timeout, TimeUnit.MILLISECONDS);
- Map<String, String> attributes = new HashMap<>();
- for (String tagName : response.getTagNames()) {
- for (int i = 0; i < response.getNumberOfValues(tagName);
i++) {
- Object value = response.getObject(tagName, i);
- attributes.put(tagName, String.valueOf(value));
- }
- }
- flowFile = session.putAllAttributes(flowFile, attributes);
-
- if (tags == null){
- if (debugEnabled)
- logger.debug("Adding PlcTypes resolution into cache
with key: " + addressMap);
- getSchemaCache().addSchema(
- addressMap,
- readRequest.getTagNames(),
- readRequest.getTags(),
- null
- );
- }
+ PlcReadRequest readRequest = getReadRequest(logger, addressMap,
tags, connection);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new ProcessException(e);
- } catch (ExecutionException e) {
+ try {
+ final PlcReadResponse response =
readRequest.execute().get(getTimeout(context, incomingFlowFile),
TimeUnit.MILLISECONDS);
+
+ evaluateReadResponse(session, flowFile, response);
+
+ } catch (TimeoutException e) {
+ logger.error("Timeout reading the data from PLC", e);
+
getConnectionManager().removeCachedConnection(getConnectionString(context,
incomingFlowFile));
throw new ProcessException(e);
+ } catch (Exception e) {
+ logger.error("Exception reading the data from PLC", e);
+ throw (e instanceof ProcessException) ? (ProcessException) e :
new ProcessException(e);
+ }
+
+
+ if (incomingFlowFile != null) {
+ session.remove(incomingFlowFile);
}
session.transfer(flowFile, REL_SUCCESS);
- } catch (ProcessException e) {
- throw e;
+
+ if (tags == null){
+ if (debugEnabled)
+ logger.debug("Adding PlcTypes resolution into cache with
key: " + addressMap);
+ getSchemaCache().addSchema(
+ addressMap,
+ readRequest.getTagNames(),
+ readRequest.getTags(),
+ null
+ );
+ }
+
} catch (Exception e) {
- throw new ProcessException("Got an error while trying to get a
connection", e);
+ session.remove(flowFile);
+ if (incomingFlowFile != null){
+ incomingFlowFile = session.putAttribute(incomingFlowFile,
EXCEPTION, e.getLocalizedMessage());
+ session.transfer(incomingFlowFile, REL_FAILURE);
+ }
+ session.commitAsync();
+ throw (e instanceof ProcessException) ? (ProcessException) e : new
ProcessException(e);
}
}
-
+
}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
index 6d2e5c246e..c2e53e7625 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
@@ -35,7 +35,6 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -70,8 +69,10 @@ public class Plc4xSourceRecordProcessor extends
BasePlc4xProcessor {
public static final String RESULT_ROW_COUNT = "plc4x.read.row.count";
public static final String RESULT_QUERY_EXECUTION_TIME =
"plc4x.read.query.executiontime";
public static final String INPUT_FLOWFILE_UUID = "input.flowfile.uuid";
+ public static final String EXCEPTION = "plc4x.read.exception";
- public static final PropertyDescriptor PLC_RECORD_WRITER_FACTORY = new
PropertyDescriptor.Builder().name("plc4x-record-writer").displayName("Record
Writer")
+ public static final PropertyDescriptor PLC_RECORD_WRITER_FACTORY = new
PropertyDescriptor.Builder()
+ .name("plc4x-record-writer").displayName("Record Writer")
.description("Specifies the Controller Service to use for
writing results to a FlowFile. The Record Writer may use Inherit Schema to
emulate the inferred schema behavior, i.e. "
+ "an explicit schema need not be defined in
the writer, and will be supplied by the same logic used to infer the schema
from the column types.")
.identifiesControllerService(RecordSetWriterFactory.class)
@@ -89,130 +90,114 @@ public class Plc4xSourceRecordProcessor extends
BasePlc4xProcessor {
this.properties = Collections.unmodifiableList(pds);
}
- @OnScheduled
- @Override
- public void onScheduled(final ProcessContext context) {
- super.onScheduled(context);
- }
@Override
public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+
FlowFile fileToProcess = null;
- // TODO: In the future the processor will be configurable to
get the connection from incoming flowfile
if (context.hasIncomingConnection()) {
fileToProcess = session.get();
- // If we have no FlowFile, and all incoming connections
are self-loops then we
- // can continue on.
- // However, if we have no FlowFile and we have
connections coming from other
- // Processors, then we know that we should run only if
we have a FlowFile.
+
if (fileToProcess == null &&
context.hasNonLoopConnection()) {
return;
}
}
-
- Plc4xWriter plc4xWriter = new
RecordPlc4xWriter(context.getProperty(PLC_RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class),
fileToProcess == null ? Collections.emptyMap() :
fileToProcess.getAttributes());
+
final ComponentLog logger = getLogger();
+
+
// Get an instance of a component able to read from a PLC.
- // TODO: Change this to use NiFi service instead of direct
connection
final AtomicLong nrOfRows = new AtomicLong(0L);
final StopWatch executeTime = new StopWatch(true);
- String inputFileUUID = fileToProcess == null ? null :
fileToProcess.getAttribute(CoreAttributes.UUID.key());
- Map<String, String> inputFileAttrMap = fileToProcess == null ?
null : fileToProcess.getAttributes();
- FlowFile resultSetFF;
+ final FlowFile resultSetFF;
if (fileToProcess == null) {
resultSetFF = session.create();
} else {
resultSetFF = session.create(fileToProcess);
- }
- if (inputFileAttrMap != null) {
- resultSetFF = session.putAllAttributes(resultSetFF,
inputFileAttrMap);
+ session.putAttribute(resultSetFF, INPUT_FLOWFILE_UUID,
fileToProcess.getAttribute(CoreAttributes.UUID.key()));
}
- try (PlcConnection connection =
getConnectionManager().getConnection(getConnectionString())) {
- PlcReadRequest.Builder builder =
connection.readRequestBuilder();
- Map<String,String> addressMap =
getPlcAddressMap(context, fileToProcess);
- final RecordSchema recordSchema =
getSchemaCache().retrieveSchema(addressMap);
- final Map<String, PlcTag> tags =
getSchemaCache().retrieveTags(addressMap);
+ final FlowFile originalFlowFile = fileToProcess;
- if (tags != null){
- for (Map.Entry<String,PlcTag> tag :
tags.entrySet()){
- builder.addTag(tag.getKey(),
tag.getValue());
- }
- } else {
- if (debugEnabled)
- logger.debug("Plc-Avro schema and PlcTypes resolution not
found in cache and will be added with key: " + addressMap);
- for (Map.Entry<String,String> entry:
addressMap.entrySet()){
- builder.addTagAddress(entry.getKey(),
entry.getValue());
- }
- }
-
- PlcReadRequest readRequest = builder.build();
- final FlowFile originalFlowFile = fileToProcess;
- resultSetFF = session.write(resultSetFF, out -> {
- try {
- PlcReadResponse readResponse =
readRequest.execute().get(this.timeout, TimeUnit.MILLISECONDS);
+ Plc4xWriter plc4xWriter = new
RecordPlc4xWriter(context.getProperty(PLC_RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class),
+ fileToProcess == null ? Collections.emptyMap() :
fileToProcess.getAttributes());
+
+
+ try {
+ session.write(resultSetFF, out -> {
+ final Map<String,String> addressMap =
getPlcAddressMap(context, originalFlowFile);
+ final RecordSchema recordSchema =
getSchemaCache().retrieveSchema(addressMap);
+ final Map<String, PlcTag> tags =
getSchemaCache().retrieveTags(addressMap);
+ PlcReadRequest readRequest;
+ Long nrOfRowsHere;
+
+ try (PlcConnection connection =
getConnectionManager().getConnection(getConnectionString(context,
originalFlowFile))) {
- if(originalFlowFile == null) //there is
no inherit attributes to use in writer service
-
nrOfRows.set(plc4xWriter.writePlcReadResponse(readResponse, out, logger, null,
recordSchema));
- else
-
nrOfRows.set(plc4xWriter.writePlcReadResponse(readResponse, out, logger, null,
recordSchema, originalFlowFile));
- } catch (InterruptedException e) {
- logger.error("InterruptedException
reading the data from PLC", e);
- Thread.currentThread().interrupt();
- throw new ProcessException(e);
+ readRequest = getReadRequest(logger,
addressMap, tags, connection);
+
+ PlcReadResponse readResponse =
readRequest.execute().get(getTimeout(context, originalFlowFile),
TimeUnit.MILLISECONDS);
+
+ nrOfRowsHere =
evaluateReadResponse(context, logger, originalFlowFile, plc4xWriter, out,
recordSchema, readResponse);
+
} catch (TimeoutException e) {
logger.error("Timeout reading the data
from PLC", e);
+
getConnectionManager().removeCachedConnection(getConnectionString(context,
originalFlowFile));
throw new ProcessException(e);
+ } catch (PlcConnectionException e) {
+ logger.error("Error getting the PLC
connection", e);
+ throw new ProcessException("Got an a
PlcConnectionException while trying to get a connection", e);
} catch (Exception e) {
logger.error("Exception reading the
data from PLC", e);
throw (e instanceof ProcessException) ?
(ProcessException) e : new ProcessException(e);
}
- });
- if (recordSchema == null){
- if (debugEnabled)
- logger.debug("Adding Plc-Avro schema and PlcTypes
resolution into cache with key: " + addressMap);
- getSchemaCache().addSchema(
- addressMap,
- readRequest.getTagNames(),
- readRequest.getTags(),
- plc4xWriter.getRecordSchema()
- );
- }
- long executionTimeElapsed =
executeTime.getElapsed(TimeUnit.MILLISECONDS);
- final Map<String, String> attributesToAdd = new
HashMap<>();
- attributesToAdd.put(RESULT_ROW_COUNT,
String.valueOf(nrOfRows.get()));
- attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME,
String.valueOf(executionTimeElapsed));
- if (inputFileUUID != null) {
- attributesToAdd.put(INPUT_FLOWFILE_UUID,
inputFileUUID);
- }
-
attributesToAdd.putAll(plc4xWriter.getAttributesToAdd());
- resultSetFF = session.putAllAttributes(resultSetFF,
attributesToAdd);
- plc4xWriter.updateCounters(session);
- logger.info("{} contains {} records; transferring to
'success'", new Object[] { resultSetFF, nrOfRows.get() });
- // Report a FETCH event if there was an incoming flow
file, or a RECEIVE event
- // otherwise
- if (context.hasIncomingConnection()) {
-
session.getProvenanceReporter().fetch(resultSetFF, "Retrieved " +
nrOfRows.get() + " rows", executionTimeElapsed);
- } else {
-
session.getProvenanceReporter().receive(resultSetFF, "Retrieved " +
nrOfRows.get() + " rows", executionTimeElapsed);
- }
+ if (recordSchema == null){
+ if (debugEnabled)
+ logger.debug("Adding PlcTypes
resolution into cache with key: " + addressMap);
+ getSchemaCache().addSchema(
+ addressMap,
+ readRequest.getTagNames(),
+ readRequest.getTags(),
+ plc4xWriter.getRecordSchema()
+ );
+ }
+ nrOfRows.set(nrOfRowsHere);
+
+ });
- session.transfer(resultSetFF,
BasePlc4xProcessor.REL_SUCCESS);
- // Need to remove the original input file if it exists
+ } catch (Exception e) {
+ logger.error("Exception reading the data from the PLC",
e);
if (fileToProcess != null) {
- session.remove(fileToProcess);
- fileToProcess = null;
+ session.putAttribute(fileToProcess, EXCEPTION,
e.getLocalizedMessage());
+ session.transfer(fileToProcess, REL_FAILURE);
}
+ session.remove(resultSetFF);
session.commitAsync();
-
- } catch (PlcConnectionException e) {
- logger.error("Error getting the PLC connection", e);
- throw new ProcessException("Got an a
PlcConnectionException while trying to get a connection", e);
- } catch (Exception e) {
- logger.error("Got an error while trying to get a
connection", e);
- throw new ProcessException("Got an error while trying
to get a connection", e);
+ throw (e instanceof ProcessException) ?
(ProcessException) e : new ProcessException(e);
+ }
+
+ plc4xWriter.updateCounters(session);
+ long executionTimeElapsed =
executeTime.getElapsed(TimeUnit.MILLISECONDS);
+ final Map<String, String> attributesToAdd = new HashMap<>();
+ attributesToAdd.put(RESULT_ROW_COUNT,
String.valueOf(nrOfRows.get()));
+ attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME,
String.valueOf(executionTimeElapsed));
+ attributesToAdd.putAll(plc4xWriter.getAttributesToAdd());
+
+ session.putAllAttributes(resultSetFF, attributesToAdd);
+
+ logger.info("{} contains {} records; transferring to
'success'", resultSetFF, nrOfRows.get());
+
+ if (context.hasIncomingConnection()) {
+ session.getProvenanceReporter().fetch(resultSetFF,
"Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed);
+ } else {
+ session.getProvenanceReporter().receive(resultSetFF,
"Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed);
+ }
+
+ if (fileToProcess != null) {
+ session.remove(fileToProcess);
}
+ session.transfer(resultSetFF, REL_SUCCESS);
}
+
}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessStrategy.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessStrategy.java
index 434e924d4a..3794c0ba07 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessStrategy.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessStrategy.java
@@ -16,11 +16,32 @@
*/
package org.apache.plc4x.nifi.address;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
+import java.util.List;
import java.util.Map;
public interface AddressesAccessStrategy {
+ /**
+ * Returns the allowable value associated with the strategy.
+ * @return AllowableValue the allowable value associated
+ */
+ AllowableValue getAllowableValue();
+
+ /**
+ * Returns a list of property descriptors needed in for the strategy.
+ * @return List of PropertyDescriptor needed for the strategy
+ */
+ List<PropertyDescriptor> getPropertyDescriptors();
+
+ /**
+ * Returns a map with the names and addresses of the tags.
+ * @param context the context of the processor
+ * @param flowFile the FlowFile being processed
+ * @return Map with the tag names and addresses
+ */
Map<String, String> extractAddresses(final ProcessContext context, final
FlowFile flowFile);
}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessUtils.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessUtils.java
index bfeff23fdd..9b63d584b2 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessUtils.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessUtils.java
@@ -22,45 +22,75 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.JsonValidator;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.plc4x.java.DefaultPlcDriverManager;
public class AddressesAccessUtils {
- public static final AllowableValue ADDRESS_PROPERTY = new AllowableValue(
- "property-address",
- "Use Properties as Addresses",
- "Each property will be treated as tag-address pairs after
Expression Language is evaluated.");
- public static final AllowableValue ADDRESS_TEXT = new AllowableValue(
- "text-address",
- "Use 'Address Text' Property",
- "Addresses will be obtained from 'Address Text' Property. It's
content must be a valid JSON " +
- "after Expression Language is evaluated. ");
+ private static DefaultPlcDriverManager manager = new
DefaultPlcDriverManager();
- public static final PropertyDescriptor PLC_ADDRESS_ACCESS_STRATEGY = new
PropertyDescriptor.Builder()
- .name("plc4x-address-access-strategy")
- .displayName("Address Access Strategy")
- .description("Strategy used to obtain the PLC addresses")
- .allowableValues(ADDRESS_PROPERTY, ADDRESS_TEXT)
- .defaultValue(ADDRESS_PROPERTY.getValue())
- .required(true)
- .build();
+ public static DefaultPlcDriverManager getManager() {
+ return manager;
+ }
+
+ public static final AllowableValue ADDRESS_PROPERTY = new
AllowableValue(
+ "property-address",
+ "Use Properties as Addresses",
+ "Each property will be treated as tag-address pairs after
Expression Language is evaluated.");
- public static final PropertyDescriptor ADDRESS_TEXT_PROPERTY = new
PropertyDescriptor.Builder()
- .name("text-address-property")
- .displayName("Address Text")
- .description("Must contain a valid JSON object after Expression
Language is evaluated. "
- + "Each field-value is treated as tag-address.")
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(new JsonValidator())
- .dependsOn(PLC_ADDRESS_ACCESS_STRATEGY, ADDRESS_TEXT)
- .required(true)
- .build();
+ public static final AllowableValue ADDRESS_TEXT = new AllowableValue(
+ "text-address",
+ "Use 'Address Text' Property",
+ "Addresses will be obtained from 'Address Text' Property. It's
content must be a valid JSON " +
+ "after Expression Language is evaluated. ");
- public static AddressesAccessStrategy getAccessStrategy(final
ProcessContext context) {
- String value =
context.getProperty(PLC_ADDRESS_ACCESS_STRATEGY).getValue();
- if (ADDRESS_PROPERTY.getValue().equalsIgnoreCase(value))
- return new DynamicPropertyAccessStrategy();
- else if (ADDRESS_TEXT.getValue().equalsIgnoreCase(value))
- return new TextPropertyAccessStrategy();
- return null;
- }
+ public static final AllowableValue ADDRESS_FILE = new AllowableValue(
+ "file-address",
+ "Use 'Address File' Property",
+ "Addresses will be obtained from the file in 'Address File'
Property. It's content must be a valid JSON " +
+ "after Expression Language is evaluated. ");
+
+ public static final PropertyDescriptor PLC_ADDRESS_ACCESS_STRATEGY =
new PropertyDescriptor.Builder()
+ .name("plc4x-address-access-strategy")
+ .displayName("Address Access Strategy")
+ .description("Strategy used to obtain the PLC addresses")
+ .allowableValues(ADDRESS_PROPERTY, ADDRESS_TEXT, ADDRESS_FILE)
+ .defaultValue(ADDRESS_PROPERTY.getValue())
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor ADDRESS_TEXT_PROPERTY = new
PropertyDescriptor.Builder()
+ .name("text-address-property")
+ .displayName("Address Text")
+ .description("Must contain a valid JSON object after Expression
Language is evaluated. "
+ + "Each field-value is treated as tag-address.")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(new JsonValidator())
+ .addValidator(new
TextPropertyAccessStrategy.TagValidator(manager))
+ .dependsOn(PLC_ADDRESS_ACCESS_STRATEGY, ADDRESS_TEXT)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor ADDRESS_FILE_PROPERTY = new
PropertyDescriptor.Builder()
+ .name("file-address-property")
+ .displayName("Address File")
+ .description("Must contain a valid path after Expression
Language is evaluated. "
+ + "The file content must be a valid JSON and each
field-value is treated as tag-address.")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+ .addValidator(new
FilePropertyAccessStrategy.TagValidator(manager))
+ .dependsOn(PLC_ADDRESS_ACCESS_STRATEGY, ADDRESS_FILE)
+ .required(true)
+ .build();
+
+ public static AddressesAccessStrategy getAccessStrategy(final
ProcessContext context) {
+ String value =
context.getProperty(PLC_ADDRESS_ACCESS_STRATEGY).getValue();
+ if (ADDRESS_PROPERTY.getValue().equalsIgnoreCase(value))
+ return new DynamicPropertyAccessStrategy();
+ else if (ADDRESS_TEXT.getValue().equalsIgnoreCase(value))
+ return new TextPropertyAccessStrategy();
+ else if (ADDRESS_FILE.getValue().equalsIgnoreCase(value))
+ return new FilePropertyAccessStrategy();
+ return null;
+ }
}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/BaseAccessStrategy.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/BaseAccessStrategy.java
new file mode 100644
index 0000000000..c94d7d3141
--- /dev/null
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/BaseAccessStrategy.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.plc4x.nifi.address;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.plc4x.java.DefaultPlcDriverManager;
+import org.apache.plc4x.java.api.PlcDriver;
+import org.apache.plc4x.nifi.BasePlc4xProcessor;
+
+
+public abstract class BaseAccessStrategy implements AddressesAccessStrategy{
+ private boolean isInitializated = false;
+ private boolean isDynamic;
+ protected Map<String,String> cachedAddresses = null;
+
+ protected AllowableValue allowableValue;
+ protected List<PropertyDescriptor> propertyDescriptors = new ArrayList<>();
+
+ protected Map<String, String> getCachedAddresses() {
+ return cachedAddresses;
+ }
+
+ public Map<String,String> extractAddressesFromResources(final
ProcessContext context, final FlowFile flowFile) {
+ throw new UnsupportedOperationException("Method
'extractAddressesFromResources' not implemented");
+ }
+
+
+ @Override
+ public Map<String, String> extractAddresses(final ProcessContext context,
final FlowFile flowFile) {
+ if (!isInitializated) {
+ getPropertyDescriptors().forEach(prop -> {
+ if (context.isExpressionLanguagePresent(prop)){
+ isDynamic = true;
+ }
+ });
+ isInitializated = true;
+ }
+
+ Map<String, String> result = getCachedAddresses();
+ if (result == null) {
+ result = extractAddressesFromResources(context, flowFile);
+ if (!isDynamic) {
+ cachedAddresses = result;
+ }
+ }
+ return result;
+ }
+
+ public static class TagValidator implements Validator {
+
+ private DefaultPlcDriverManager manager;
+
+ public TagValidator(DefaultPlcDriverManager manager) {
+ this.manager = manager;
+ }
+
+ protected void checkTags(PlcDriver driver, Collection<String> tags) {
+ for (String tag : tags) {
+ driver.prepareTag(tag);
+ }
+ }
+
+ protected Collection<String> getTags(String input) throws Exception {
+ throw new UnsupportedOperationException("Method 'getTags' not
implemented");
+ }
+
+ @Override
+ public ValidationResult validate(String subject, String input,
ValidationContext context) {
+ String connectionString =
context.getProperty(BasePlc4xProcessor.PLC_CONNECTION_STRING).getValue();
+
+ if (context.isExpressionLanguageSupported(subject) &&
context.isExpressionLanguagePresent(input) ||
+ context.isExpressionLanguagePresent(connectionString)) {
+ return new
ValidationResult.Builder().subject(subject).input(input)
+ .explanation("Expression Language
Present").valid(true).build();
+ }
+
+ try {
+ PlcDriver driver = manager.getDriverForUrl(connectionString);
+
+ if (!context.isExpressionLanguagePresent(input)) {
+ checkTags(driver, getTags(input));
+ }
+
+ }catch (Exception e) {
+ return new ValidationResult.Builder().subject(subject)
+ .explanation(e.getLocalizedMessage())
+ .valid(false)
+ .build();
+ }
+
+ return new ValidationResult.Builder().subject(subject)
+ .explanation("")
+ .valid(true)
+ .build();
+ }
+ }
+
+}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/DynamicPropertyAccessStrategy.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/DynamicPropertyAccessStrategy.java
index fe7c3c5db2..879766f6de 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/DynamicPropertyAccessStrategy.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/DynamicPropertyAccessStrategy.java
@@ -17,14 +17,34 @@
package org.apache.plc4x.nifi.address;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.plc4x.java.DefaultPlcDriverManager;
-public class DynamicPropertyAccessStrategy implements AddressesAccessStrategy{
+
+public class DynamicPropertyAccessStrategy extends BaseAccessStrategy{
+
+ @Override
+ public AllowableValue getAllowableValue() {
+ return AddressesAccessUtils.ADDRESS_PROPERTY;
+ }
+
+ @Override
+ public List<PropertyDescriptor> getPropertyDescriptors() {
+ return List.of();
+ }
+
+ @Override
+ public Map<String,String> extractAddressesFromResources(final
ProcessContext context, final FlowFile flowFile) {
+ return extractAddressesFromAttributes(context, flowFile);
+ }
private Map<String,String> extractAddressesFromAttributes(final
ProcessContext context, final FlowFile flowFile) {
Map<String,String> addressMap = new HashMap<>();
@@ -35,7 +55,16 @@ public class DynamicPropertyAccessStrategy implements
AddressesAccessStrategy{
return addressMap;
}
- public Map<String, String> extractAddresses(final ProcessContext context,
final FlowFile flowFile) {
- return extractAddressesFromAttributes(context, flowFile);
+
+ public static class TagValidator extends BaseAccessStrategy.TagValidator {
+ public TagValidator(DefaultPlcDriverManager manager) {
+ super(manager);
+ }
+
+ @Override
+ protected Collection<String> getTags(String input) throws Exception {
+ return List.of(input);
+ }
}
+
}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/FilePropertyAccessStrategy.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/FilePropertyAccessStrategy.java
new file mode 100644
index 0000000000..b3214375b8
--- /dev/null
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/FilePropertyAccessStrategy.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.plc4x.nifi.address;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+
+import java.nio.file.StandardOpenOption;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.plc4x.java.DefaultPlcDriverManager;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class FilePropertyAccessStrategy extends BaseAccessStrategy {
+
+ @Override
+ public AllowableValue getAllowableValue() {
+ return AddressesAccessUtils.ADDRESS_FILE;
+ }
+
+ @Override
+ public List<PropertyDescriptor> getPropertyDescriptors() {
+ return List.of(AddressesAccessUtils.ADDRESS_FILE_PROPERTY);
+ }
+
+ @Override
+ public Map<String, String> extractAddressesFromResources(final
ProcessContext context, final FlowFile flowFile) throws ProcessException{
+ try {
+ return
extractAddressesFromFile(context.getProperty(AddressesAccessUtils.ADDRESS_FILE_PROPERTY).evaluateAttributeExpressions(flowFile).getValue());
+ } catch (Exception e) {
+ throw new ProcessException(e.toString());
+ }
+ }
+
+ public static Map<String,String> extractAddressesFromFile(String fileName)
throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+
+ Path filePath = Path.of(fileName);
+ InputStream input = Files.newInputStream(filePath,
StandardOpenOption.READ);
+
+ return mapper.readerForMapOf(String.class).readValue(input);
+ }
+
+ public static class TagValidator extends BaseAccessStrategy.TagValidator {
+ public TagValidator(DefaultPlcDriverManager manager) {
+ super(manager);
+ }
+
+ @Override
+ protected Collection<String> getTags(String input) throws Exception {
+ return
FilePropertyAccessStrategy.extractAddressesFromFile(input).values();
+ }
+ }
+}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/TextPropertyAccessStrategy.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/TextPropertyAccessStrategy.java
index 7f5c955401..78149f3614 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/TextPropertyAccessStrategy.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/TextPropertyAccessStrategy.java
@@ -17,30 +17,54 @@
package org.apache.plc4x.nifi.address;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
-
+import org.apache.plc4x.java.DefaultPlcDriverManager;
import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-public class TextPropertyAccessStrategy implements AddressesAccessStrategy{
- private Map<String,String> extractAddressesFromText(String input) throws
JsonProcessingException {
- ObjectMapper mapper = new ObjectMapper();
+public class TextPropertyAccessStrategy extends BaseAccessStrategy {
- return mapper.readValue(input, Map.class);
+ @Override
+ public AllowableValue getAllowableValue() {
+ return AddressesAccessUtils.ADDRESS_TEXT;
}
@Override
- public Map<String, String> extractAddresses(final ProcessContext context,
final FlowFile flowFile) throws ProcessException{
+ public List<PropertyDescriptor> getPropertyDescriptors() {
+ return List.of(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY);
+ }
+
+ @Override
+ public Map<String, String> extractAddressesFromResources(final
ProcessContext context, final FlowFile flowFile) throws ProcessException{
try {
return
extractAddressesFromText(context.getProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY).evaluateAttributeExpressions(flowFile).getValue());
} catch (Exception e) {
throw new ProcessException(e.toString());
}
-
+ }
+
+ private static Map<String,String> extractAddressesFromText(String input)
throws JsonProcessingException {
+ ObjectMapper mapper = new ObjectMapper();
+
+ return mapper.readerForMapOf(String.class).readValue(input);
+ }
+
+ public static class TagValidator extends BaseAccessStrategy.TagValidator {
+ public TagValidator(DefaultPlcDriverManager manager) {
+ super(manager);
+ }
+
+ @Override
+ protected Collection<String> getTags(String input) throws Exception {
+ return
TextPropertyAccessStrategy.extractAddressesFromText(input).values();
+ }
}
}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
index 69157594ec..f546159e48 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
@@ -20,6 +20,7 @@ package org.apache.plc4x.nifi.record;
import java.io.Closeable;
import java.io.IOException;
+import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -46,12 +47,18 @@ public class Plc4xReadResponseRecordSet implements
RecordSet, Closeable {
private Set<String> rsColumnNames;
private boolean moreRows;
private final boolean debugEnabled = logger.isDebugEnabled();
+ private final String timestampFieldName;
private boolean isSubscription = false;
+ private Instant timestamp;
private final AtomicReference<RecordSchema> recordSchema = new
AtomicReference<>(null);
- public Plc4xReadResponseRecordSet(final PlcReadResponse readResponse,
RecordSchema recordSchema) throws IOException {
+ public Plc4xReadResponseRecordSet(final PlcReadResponse readResponse,
RecordSchema recordSchema, String timestampFieldName) {
+ this.timestampFieldName = timestampFieldName;
this.readResponse = readResponse;
+ if (!isSubscription) {
+ timestamp = Instant.now();
+ }
moreRows = true;
isSubscription = readResponse.getRequest() == null;
@@ -68,7 +75,7 @@ public class Plc4xReadResponseRecordSet implements RecordSet,
Closeable {
rsColumnNames = responseDataStructure.keySet();
if (recordSchema == null) {
- Schema avroSchema =
Plc4xCommon.createSchema(responseDataStructure);
+ Schema avroSchema =
Plc4xCommon.createSchema(responseDataStructure, this.timestampFieldName);
this.recordSchema.set(AvroTypeUtil.createSchema(avroSchema));
} else {
this.recordSchema.set(recordSchema);
@@ -78,7 +85,7 @@ public class Plc4xReadResponseRecordSet implements RecordSet,
Closeable {
}
- public Map<String, PlcValue> plc4xSubscriptionResponseRecordSet(final
DefaultPlcSubscriptionEvent subscriptionEvent) throws IOException {;
+ public Map<String, PlcValue> plc4xSubscriptionResponseRecordSet(final
DefaultPlcSubscriptionEvent subscriptionEvent) {
moreRows = true;
if (debugEnabled)
@@ -131,7 +138,7 @@ public class Plc4xReadResponseRecordSet implements
RecordSet, Closeable {
//do nothing
}
- protected Record createRecord(final PlcReadResponse readResponse) throws
IOException{
+ protected Record createRecord(final PlcReadResponse readResponse) {
final Map<String, Object> values = new
HashMap<>(getSchema().getFieldCount());
if (debugEnabled)
@@ -158,7 +165,12 @@ public class Plc4xReadResponseRecordSet implements
RecordSet, Closeable {
}
//add timestamp tag to schema
- values.put(Plc4xCommon.PLC4X_RECORD_TIMESTAMP_FIELD_NAME,
System.currentTimeMillis());
+ if (isSubscription) {
+ values.put(timestampFieldName, ((DefaultPlcSubscriptionEvent)
readResponse).getTimestamp().toEpochMilli());
+ } else {
+ values.put(timestampFieldName, timestamp.toEpochMilli());
+ }
+
if (debugEnabled)
logger.debug("added timestamp tag to record.");
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xWriter.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xWriter.java
index a950ac3735..1d774e79ef 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xWriter.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xWriter.java
@@ -39,8 +39,8 @@ public interface Plc4xWriter {
* @return the number of rows written to the output stream
* @throws Exception if any errors occur during the writing of the result
set to the output stream
*/
- long writePlcReadResponse(PlcReadResponse response, OutputStream
outputStream, ComponentLog logger, Plc4xReadResponseRowCallback callback,
RecordSchema recordSchema) throws Exception;
- long writePlcReadResponse(PlcReadResponse response, OutputStream
outputStream, ComponentLog logger, Plc4xReadResponseRowCallback callback,
RecordSchema recordSchema, FlowFile originalFlowFile) throws Exception;
+ long writePlcReadResponse(PlcReadResponse response, OutputStream
outputStream, ComponentLog logger, Plc4xReadResponseRowCallback callback,
RecordSchema recordSchema, String timestampFieldName) throws Exception;
+ long writePlcReadResponse(PlcReadResponse response, OutputStream
outputStream, ComponentLog logger, Plc4xReadResponseRowCallback callback,
RecordSchema recordSchema, FlowFile originalFlowFile, String
timestampFieldName) throws Exception;
/**
* Returns a map of attribute key/value pairs to be added to any outgoing
flow file(s). The default implementation is to return an empty map.
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
index ca8f0267cd..f1e314b5da 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
@@ -55,9 +55,11 @@ public class RecordPlc4xWriter implements Plc4xWriter {
}
@Override
- public long writePlcReadResponse(PlcReadResponse response, OutputStream
outputStream, ComponentLog logger, Plc4xReadResponseRowCallback callback,
RecordSchema recordSchema) throws Exception {
- if (fullRecordSet == null) {
- fullRecordSet = new
Plc4xReadResponseRecordSetWithCallback(response, callback, recordSchema);
+ public long writePlcReadResponse(PlcReadResponse response, OutputStream
outputStream, ComponentLog logger,
+ Plc4xReadResponseRowCallback callback, RecordSchema
recordSchema, String timestampFieldName) throws Exception {
+
+ if (fullRecordSet == null) {
+ fullRecordSet = new
Plc4xReadResponseRecordSetWithCallback(response, callback, recordSchema,
timestampFieldName);
writeSchema = recordSetWriterFactory.getSchema(originalAttributes,
fullRecordSet.getSchema());
}
Map<String, String> empty = new HashMap<>();
@@ -73,9 +75,11 @@ public class RecordPlc4xWriter implements Plc4xWriter {
}
@Override
- public long writePlcReadResponse(PlcReadResponse response, OutputStream
outputStream, ComponentLog logger, Plc4xReadResponseRowCallback callback,
RecordSchema recordSchema, FlowFile originalFlowFile) throws Exception {
- if (fullRecordSet == null) {
- fullRecordSet = new
Plc4xReadResponseRecordSetWithCallback(response, callback, recordSchema);
+ public long writePlcReadResponse(PlcReadResponse response, OutputStream
outputStream, ComponentLog logger,
+ Plc4xReadResponseRowCallback callback, RecordSchema recordSchema,
FlowFile originalFlowFile, String timestampFieldName) throws Exception {
+
+ if (fullRecordSet == null) {
+ fullRecordSet = new
Plc4xReadResponseRecordSetWithCallback(response, callback, recordSchema,
timestampFieldName);
writeSchema = recordSetWriterFactory.getSchema(originalAttributes,
fullRecordSet.getSchema());
}
@@ -158,8 +162,10 @@ public class RecordPlc4xWriter implements Plc4xWriter {
private static class Plc4xReadResponseRecordSetWithCallback extends
Plc4xReadResponseRecordSet {
private final Plc4xReadResponseRowCallback callback;
- public Plc4xReadResponseRecordSetWithCallback(final PlcReadResponse
readResponse, Plc4xReadResponseRowCallback callback, RecordSchema recordSchema)
throws IOException {
- super(readResponse, recordSchema);
+ public Plc4xReadResponseRecordSetWithCallback(final PlcReadResponse
readResponse, Plc4xReadResponseRowCallback callback,
+ RecordSchema recordSchema, String timestampFieldName) {
+
+ super(readResponse, recordSchema, timestampFieldName);
this.callback = callback;
}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
index 801bbd90fb..df80ffd6d6 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
@@ -19,9 +19,9 @@
package org.apache.plc4x.nifi.record;
import java.util.HashMap;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -60,7 +60,7 @@ public class SchemaCache {
* @param tagsList list of PlcTag's
* @param schema record schema used for PlcResponse serialization. Can be
null
*/
- public void addSchema(final Map<String,String> schemaIdentifier, final
LinkedHashSet<String> tagsNames, final List<? extends PlcTag> tagsList, final
RecordSchema schema) {
+ public void addSchema(final Map<String,String> schemaIdentifier, final
Set<String> tagsNames, final List<? extends PlcTag> tagsList, final
RecordSchema schema) {
if (!schemaMap.containsKey(schemaIdentifier.toString())){
if (nextSchemaPosition.get() == cacheSize.get()){
nextSchemaPosition.set(0);
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
index a46cea7ee5..b46f9f72cc 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
@@ -45,7 +45,6 @@ public class Plc4xListenerDispatcher implements Runnable {
private ComponentLog logger;
private boolean running = false;
private BlockingQueue<PlcSubscriptionEvent> events;
- private PlcSubscriptionResponse subscriptionResponse;
private PlcConnection connection;
private Long timeout;
private BlockingQueue<PlcSubscriptionEvent> queuedEvents;
@@ -95,7 +94,7 @@ public class Plc4xListenerDispatcher implements Runnable {
}
}
PlcSubscriptionRequest subscriptionRequest = builder.build();
-
+ PlcSubscriptionResponse subscriptionResponse;
try {
subscriptionResponse = subscriptionRequest.execute().get(timeout,
TimeUnit.MILLISECONDS);
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/util/Plc4xCommon.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/util/Plc4xCommon.java
index a0fd6c9258..00075f6cfc 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/util/Plc4xCommon.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/util/Plc4xCommon.java
@@ -53,9 +53,6 @@ import org.apache.plc4x.java.spi.values.PlcWORD;
public class Plc4xCommon {
-
- public static final String PLC4X_RECORD_TIMESTAMP_FIELD_NAME = "ts";
-
/**
* This method is used to infer output AVRO schema directly from the
PlcReadResponse object.
* It is directly used from the
RecordPlc4xWriter.writePlcReadResponse() method.
@@ -66,7 +63,7 @@ public class Plc4xCommon {
* @param responseDataStructure: a map that reflects the structure of
the answer given by the PLC when making a Read Request.
* @return AVRO Schema built from responseDataStructure.
*/
- public static Schema createSchema(Map<String, ? extends PlcValue>
responseDataStructure){
+ public static Schema createSchema(Map<String, ? extends PlcValue>
responseDataStructure, String timestampFieldName){
//plc and record datatype map
final FieldAssembler<Schema> builder =
SchemaBuilder.record("PlcReadResponse").namespace("any.data").fields();
String fieldName = null;
@@ -108,7 +105,7 @@ public class Plc4xCommon {
}
//add timestamp tag to schema
-
builder.name(PLC4X_RECORD_TIMESTAMP_FIELD_NAME).type().longType().noDefault();
+ builder.name(timestampFieldName).type().longType().noDefault();
return builder.endRecord();
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkProcessorTest.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkProcessorTest.java
index 90d128a9fb..61601724a4 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkProcessorTest.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkProcessorTest.java
@@ -19,12 +19,16 @@ package org.apache.plc4x.nifi;
import java.util.Map;
import java.util.stream.Collectors;
+import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.plc4x.nifi.address.AddressesAccessUtils;
+import org.apache.plc4x.nifi.address.FilePropertyAccessStrategy;
import org.apache.plc4x.nifi.util.Plc4xCommonTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -41,7 +45,7 @@ public class Plc4xSinkProcessorTest {
testRunner.setValidateExpressionUsage(false);
testRunner.setProperty(Plc4xSinkProcessor.PLC_CONNECTION_STRING,
"simulated://127.0.0.1");
-
testRunner.setProperty(Plc4xSinkRecordProcessor.PLC_FUTURE_TIMEOUT_MILISECONDS,
"1000");
+
testRunner.setProperty(Plc4xSinkProcessor.PLC_FUTURE_TIMEOUT_MILISECONDS,
"1000");
testRunner.addConnection(Plc4xSinkProcessor.REL_SUCCESS);
testRunner.addConnection(Plc4xSinkProcessor.REL_FAILURE);
@@ -73,4 +77,17 @@ public class Plc4xSinkProcessorTest {
testProcessor();
}
+ // Test addressess file property access strategy
+ @Test
+ public void testWithAdderessFile() throws InitializationException {
+ testRunner.setProperty(AddressesAccessUtils.ADDRESS_FILE_PROPERTY,
"file");
+
+ try (MockedStatic<FilePropertyAccessStrategy> staticMock =
Mockito.mockStatic(FilePropertyAccessStrategy.class)) {
+ staticMock.when(() ->
FilePropertyAccessStrategy.extractAddressesFromFile("file"))
+ .thenReturn(Plc4xCommonTest.getAddressMap());
+
+ testProcessor();
+ }
+ }
+
}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessorTest.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessorTest.java
index 627ab2e80b..9f22b62bc8 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessorTest.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessorTest.java
@@ -25,9 +25,12 @@ import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.plc4x.nifi.address.AddressesAccessUtils;
+import org.apache.plc4x.nifi.address.FilePropertyAccessStrategy;
import org.apache.plc4x.nifi.util.Plc4xCommonTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -91,4 +94,17 @@ public class Plc4xSinkRecordProcessorTest {
testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY, new
ObjectMapper().writeValueAsString(Plc4xCommonTest.getAddressMap()).toString());
testAvroRecordReaderProcessor();
}
+
+ // Test addressess file property access strategy
+ @Test
+ public void testWithAdderessFile() throws InitializationException {
+ testRunner.setProperty(AddressesAccessUtils.ADDRESS_FILE_PROPERTY,
"file");
+
+ try (MockedStatic<FilePropertyAccessStrategy> staticMock =
Mockito.mockStatic(FilePropertyAccessStrategy.class)) {
+ staticMock.when(() ->
FilePropertyAccessStrategy.extractAddressesFromFile("file"))
+ .thenReturn(Plc4xCommonTest.getAddressMap());
+
+ testAvroRecordReaderProcessor();
+ }
+ }
}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java
index 331ab51eb1..a5e79ceb03 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java
@@ -21,9 +21,12 @@ package org.apache.plc4x.nifi;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.plc4x.nifi.address.AddressesAccessUtils;
+import org.apache.plc4x.nifi.address.FilePropertyAccessStrategy;
import org.apache.plc4x.nifi.util.Plc4xCommonTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -37,10 +40,11 @@ public class Plc4xSourceProcessorTest {
public void init() {
testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
testRunner.setIncomingConnection(false);
- testRunner.setValidateExpressionUsage(false);
+ testRunner.setValidateExpressionUsage(true);
- testRunner.setProperty(Plc4xSourceProcessor.PLC_CONNECTION_STRING,
"simulated://127.0.0.1");
-
testRunner.setProperty(Plc4xSinkRecordProcessor.PLC_FUTURE_TIMEOUT_MILISECONDS,
"1000");
+ testRunner.setVariable("url", "simulated://127.0.0.1");
+ testRunner.setProperty(Plc4xSourceProcessor.PLC_CONNECTION_STRING,
"${url}");
+
testRunner.setProperty(Plc4xSourceProcessor.PLC_FUTURE_TIMEOUT_MILISECONDS,
"1000");
testRunner.addConnection(Plc4xSourceProcessor.REL_SUCCESS);
testRunner.addConnection(Plc4xSourceProcessor.REL_FAILURE);
@@ -69,4 +73,16 @@ public class Plc4xSourceProcessorTest {
testProcessor();
}
+ // Test addressess file property access strategy
+ @Test
+ public void testWithAdderessFile() {
+ testRunner.setProperty(AddressesAccessUtils.ADDRESS_FILE_PROPERTY,
"file");
+
+ try (MockedStatic<FilePropertyAccessStrategy> staticMock =
Mockito.mockStatic(FilePropertyAccessStrategy.class)) {
+ staticMock.when(() ->
FilePropertyAccessStrategy.extractAddressesFromFile("file"))
+ .thenReturn(Plc4xCommonTest.getAddressMap());
+
+ testProcessor();
+ }
+ }
}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java
index 4e7c43102d..5d47d0612c 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java
@@ -23,9 +23,12 @@ import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.plc4x.nifi.address.AddressesAccessUtils;
+import org.apache.plc4x.nifi.address.FilePropertyAccessStrategy;
import org.apache.plc4x.nifi.util.Plc4xCommonTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -61,7 +64,7 @@ public class Plc4xSourceRecordProcessorTest {
testRunner.assertTransferCount(Plc4xSourceRecordProcessor.REL_FAILURE,
0);
testRunner.assertTransferCount(Plc4xSourceRecordProcessor.REL_SUCCESS,
NUMBER_OF_CALLS);
-
Plc4xCommonTest.assertAvroContent(testRunner.getFlowFilesForRelationship(Plc4xSourceProcessor.REL_SUCCESS),
false, true);
+
Plc4xCommonTest.assertAvroContent(testRunner.getFlowFilesForRelationship(Plc4xSourceRecordProcessor.REL_SUCCESS),
false, true);
}
// Test dynamic properties addressess access strategy
@@ -79,4 +82,17 @@ public class Plc4xSourceRecordProcessorTest {
testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY, new
ObjectMapper().writeValueAsString(Plc4xCommonTest.getAddressMap()).toString());
testAvroRecordWriterProcessor();
}
+
+ // Test addressess file property access strategy
+ @Test
+ public void testWithAdderessFile() throws InitializationException {
+ testRunner.setProperty(AddressesAccessUtils.ADDRESS_FILE_PROPERTY,
"file");
+
+ try (MockedStatic<FilePropertyAccessStrategy> staticMock =
Mockito.mockStatic(FilePropertyAccessStrategy.class)) {
+ staticMock.when(() ->
FilePropertyAccessStrategy.extractAddressesFromFile("file"))
+ .thenReturn(Plc4xCommonTest.getAddressMap());
+
+ testAvroRecordWriterProcessor();
+ }
+ }
}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/address/AccessStrategyTest.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/address/AccessStrategyTest.java
new file mode 100644
index 0000000000..c244991c61
--- /dev/null
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/address/AccessStrategyTest.java
@@ -0,0 +1,190 @@
+package org.apache.plc4x.nifi.address;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.plc4x.nifi.Plc4xSourceProcessor;
+import org.apache.plc4x.nifi.util.Plc4xCommonTest;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class AccessStrategyTest {
+
+ @Mock
+ FilePropertyAccessStrategy testFileObject = new
FilePropertyAccessStrategy();
+
+ private TestRunner testRunner;
+
+ // Tests that addresses in dynamic properties are read correctly and
addresses are cached if no EL is used
+ @Test
+ public void testDynamicPropertyAccessStrategy() {
+
+ DynamicPropertyAccessStrategy testObject = new
DynamicPropertyAccessStrategy();
+ testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+
+ assert
testObject.getAllowableValue().equals(AddressesAccessUtils.ADDRESS_PROPERTY);
+ assert testObject.getPropertyDescriptors().isEmpty();
+
+ Plc4xCommonTest.getAddressMap().forEach((k,v) ->
testRunner.setProperty(k, v));
+
+ FlowFile flowFile = testRunner.enqueue("");
+
+ Map<String, String> values =
testObject.extractAddresses(testRunner.getProcessContext(), flowFile);
+
+ assertTrue(testObject.getCachedAddresses().equals(values));
+
assertTrue(testObject.getCachedAddresses().equals(Plc4xCommonTest.getAddressMap()));
+ }
+
+ // Tests incorrect address detection on dynamic properties
+ @Test
+ public void testDynamicPropertyAccessStrategyIncorrect() {
+ testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+
+ Plc4xCommonTest.getAddressMap().forEach((k,v) ->
testRunner.setProperty(k, "no an correct address"));
+
+ testRunner.assertNotValid();
+ }
+
+ // Tests that if EL is present in dynamic properties the processor is valid
+ @Test
+ public void testDynamicPropertyAccessStrategyELPresent() {
+ testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+ testRunner.setProperty(Plc4xSourceProcessor.PLC_CONNECTION_STRING,
"simulated://127.0.0.1");
+
+ Plc4xCommonTest.getAddressMap().forEach((k,v) ->
testRunner.setProperty(k, "${attribute}"));
+
+ testRunner.assertValid();
+ }
+
+ // Tests that addresses in text property are read correctly and addresses
are cached if no EL is used
+ @Test
+ public void testTextPropertyAccessStrategy() throws
JsonProcessingException {
+
+ TextPropertyAccessStrategy testObject = new
TextPropertyAccessStrategy();
+ testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+
+ assert
testObject.getAllowableValue().equals(AddressesAccessUtils.ADDRESS_TEXT);
+ assert
testObject.getPropertyDescriptors().contains(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY);
+
+ testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY, new
ObjectMapper().writeValueAsString(Plc4xCommonTest.getAddressMap()).toString());
+
+ FlowFile flowFile = testRunner.enqueue("");
+
+ Map<String, String> values =
testObject.extractAddresses(testRunner.getProcessContext(), flowFile);
+
+ assertTrue(testObject.getCachedAddresses().equals(values));
+
assertTrue(testObject.getCachedAddresses().equals(Plc4xCommonTest.getAddressMap()));
+ }
+
+
+
+ // Tests incorrect address detection on text property
+ @Test
+ public void testTextPropertyAccessStrategyIncorrect() {
+
+ TextPropertyAccessStrategy testObject = new
TextPropertyAccessStrategy();
+ testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+
+ assert
testObject.getAllowableValue().equals(AddressesAccessUtils.ADDRESS_TEXT);
+ assert
testObject.getPropertyDescriptors().contains(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY);
+
+ Plc4xCommonTest.getAddressMap().forEach((k,v) ->
testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY.getName(),
"no an correct address"));
+
+ testRunner.assertNotValid();
+
+ Plc4xCommonTest.getAddressMap().forEach((k,v) ->
testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY.getName(),
"{\"neither\":\"this one\"}"));
+
+ testRunner.assertNotValid();
+ }
+
+ // Tests that if EL is present in text property the processor is valid
+ @Test
+ public void testTextPropertyAccessStrategyELPresent() {
+
+ TextPropertyAccessStrategy testObject = new
TextPropertyAccessStrategy();
+ testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+
+ testRunner.setProperty(Plc4xSourceProcessor.PLC_CONNECTION_STRING,
"simulated://127.0.0.1");
+
+ assert
testObject.getAllowableValue().equals(AddressesAccessUtils.ADDRESS_TEXT);
+ assert
testObject.getPropertyDescriptors().contains(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY);
+
+ Plc4xCommonTest.getAddressMap().forEach((k,v) ->
testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY.getName(),
"${attribute}"));
+
+ testRunner.assertValid();
+ }
+
+ // Tests that addresses in file are read correctly and addresses are
cached if no EL is used
+ @Test
+ public void testFilePropertyAccessStrategy() throws IOException {
+
+ testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+
+ assert
testFileObject.getAllowableValue().equals(AddressesAccessUtils.ADDRESS_FILE);
+ assert
testFileObject.getPropertyDescriptors().contains(AddressesAccessUtils.ADDRESS_FILE_PROPERTY);
+
+
+ testRunner.setProperty(AddressesAccessUtils.ADDRESS_FILE_PROPERTY,
"file");
+
+ try (MockedStatic<FilePropertyAccessStrategy> staticMock =
Mockito.mockStatic(FilePropertyAccessStrategy.class)) {
+ staticMock.when(() ->
FilePropertyAccessStrategy.extractAddressesFromFile("file"))
+ .thenReturn(Plc4xCommonTest.getAddressMap());
+
+
+ FlowFile flowFile = testRunner.enqueue("");
+ Map<String, String> values =
testFileObject.extractAddresses(testRunner.getProcessContext(), flowFile);
+
+ assertTrue(testFileObject.getCachedAddresses().equals(values));
+
assertTrue(testFileObject.getCachedAddresses().equals(Plc4xCommonTest.getAddressMap()));
+ }
+ }
+
+ // Tests incorrect address detection on file
+ @Test
+ public void testFilePropertyAccessStrategyIncorrect() throws IOException {
+
+ testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+
+ assert
testFileObject.getAllowableValue().equals(AddressesAccessUtils.ADDRESS_FILE);
+ assert
testFileObject.getPropertyDescriptors().contains(AddressesAccessUtils.ADDRESS_FILE_PROPERTY);
+
+ testRunner.setProperty(AddressesAccessUtils.ADDRESS_FILE_PROPERTY,
"file");
+
+ try (MockedStatic<FilePropertyAccessStrategy> staticMock =
Mockito.mockStatic(FilePropertyAccessStrategy.class)) {
+ staticMock.when(() ->
FilePropertyAccessStrategy.extractAddressesFromFile("file"))
+ .thenReturn(Map.of("not", "a correct address"));
+
+ testRunner.assertNotValid();
+ }
+ }
+
+ // Tests that if EL is present in file the processor is valid
+ @Test
+ public void testFilePropertyAccessStrategyELPresent() throws IOException {
+
+ testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+
+ testRunner.setProperty(Plc4xSourceProcessor.PLC_CONNECTION_STRING,
"simulated://127.0.0.1");
+
+ assert
testFileObject.getAllowableValue().equals(AddressesAccessUtils.ADDRESS_FILE);
+ assert
testFileObject.getPropertyDescriptors().contains(AddressesAccessUtils.ADDRESS_FILE_PROPERTY);
+
+ testRunner.setProperty(AddressesAccessUtils.ADDRESS_FILE_PROPERTY,
"file");
+
+ try (MockedStatic<FilePropertyAccessStrategy> staticMock =
Mockito.mockStatic(FilePropertyAccessStrategy.class)) {
+ staticMock.when(() ->
FilePropertyAccessStrategy.extractAddressesFromFile("file"))
+ .thenReturn(Map.of("EL in use", "${attribute}"));
+
+ testRunner.assertValid();
+ }
+ }
+}