This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x-extras.git
The following commit(s) were added to refs/heads/develop by this push:
new 0ddb3da feat(plc4j/nifi): improve logging (#334)
0ddb3da is described below
commit 0ddb3daa7645420ef949d42e67c2d09cc0a00fda
Author: Unai LerĂa Fortea <[email protected]>
AuthorDate: Mon Jun 2 19:30:01 2025 +0200
feat(plc4j/nifi): improve logging (#334)
* Add more debug logging on the component logger
* Remove default plc driver manager creation on connection string validation
* Complete merge after nifi 1 and 2 split
---
.../org/apache/plc4x/nifi/BasePlc4xProcessor.java | 4 +---
.../apache/plc4x/nifi/Plc4xSourceProcessor.java | 8 ++++++-
.../plc4x/nifi/Plc4xSourceRecordProcessor.java | 10 ++++++--
.../nifi/record/Plc4xReadResponseRecordSet.java | 27 +++++++++++-----------
.../plc4x/nifi/record/RecordPlc4xWriter.java | 8 +++----
.../org/apache/plc4x/nifi/record/SchemaCache.java | 7 +++---
.../nifi/subscription/Plc4xListenerDispatcher.java | 14 +++++++++++
.../org/apache/plc4x/nifi/BasePlc4xProcessor.java | 4 +---
.../apache/plc4x/nifi/Plc4xSourceProcessor.java | 8 ++++++-
.../plc4x/nifi/Plc4xSourceRecordProcessor.java | 10 ++++++--
.../nifi/record/Plc4xReadResponseRecordSet.java | 27 +++++++++++-----------
.../plc4x/nifi/record/RecordPlc4xWriter.java | 8 +++----
.../org/apache/plc4x/nifi/record/SchemaCache.java | 7 +++---
.../nifi/subscription/Plc4xListenerDispatcher.java | 14 +++++++++++
14 files changed, 104 insertions(+), 52 deletions(-)
diff --git
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
index 05d7cbb..0f55fea 100644
---
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
+++
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
@@ -45,7 +45,6 @@ import
org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.plc4x.java.DefaultPlcDriverManager;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.PlcDriver;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
@@ -336,13 +335,12 @@ public abstract class BasePlc4xProcessor extends
AbstractProcessor {
protected static class Plc4xConnectionStringValidator implements Validator
{
@Override
public ValidationResult validate(String subject, String input,
ValidationContext context) {
- DefaultPlcDriverManager manager = new DefaultPlcDriverManager();
if (context.isExpressionLanguageSupported(subject) &&
context.isExpressionLanguagePresent(input)) {
return new
ValidationResult.Builder().subject(subject).input(input).explanation("Expression
Language Present").valid(true).build();
}
try {
- PlcDriver driver = manager.getDriverForUrl(input);
+ PlcDriver driver =
AddressesAccessUtils.getManager().getDriverForUrl(input);
driver.getConnection(input);
} catch (PlcConnectionException e) {
return new ValidationResult.Builder().subject(subject)
diff --git
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
index 9e02be0..bee1aa7 100644
---
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
+++
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
@@ -61,7 +61,13 @@ public class Plc4xSourceProcessor extends BasePlc4xProcessor
{
final ComponentLog logger = getLogger();
final FlowFile flowFile = session.create();
- try(PlcConnection connection =
getConnectionManager().getConnection(getConnectionString(context,
incomingFlowFile))) {
+ final String connectionString = getConnectionString(context,
incomingFlowFile);
+
+ if (debugEnabled) {
+ logger.debug("Get connection for plc: {}", connectionString);
+ }
+
+ try(PlcConnection connection =
getConnectionManager().getConnection(connectionString)) {
if (!connection.getMetadata().isReadSupported()) {
throw new ProcessException("Reading not supported by
connection");
diff --git
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
index 33208ce..d1f2e4c 100644
---
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
+++
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
@@ -132,7 +132,13 @@ public class Plc4xSourceRecordProcessor extends
BasePlc4xProcessor {
PlcReadRequest readRequest;
long nrOfRowsHere;
- try (PlcConnection connection =
getConnectionManager().getConnection(getConnectionString(context,
originalFlowFile))) {
+ final String connectionString =
getConnectionString(context, originalFlowFile);
+
+ if (debugEnabled) {
+ logger.debug("Get connection for plc:
{}", connectionString);
+ }
+
+ try (PlcConnection connection =
getConnectionManager().getConnection(connectionString)) {
readRequest = getReadRequest(logger,
addressMap, tags, connection);
@@ -142,7 +148,7 @@ public class Plc4xSourceRecordProcessor extends
BasePlc4xProcessor {
} catch (TimeoutException e) {
logger.error("Timeout reading the data
from PLC", e);
-
getConnectionManager().removeCachedConnection(getConnectionString(context,
originalFlowFile));
+
getConnectionManager().removeCachedConnection(connectionString);
throw new ProcessException(e);
} catch (PlcConnectionException e) {
logger.error("Error getting the PLC
connection", e);
diff --git
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
index 33c6eaa..ccdc90b 100644
---
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
+++
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
@@ -36,24 +37,23 @@ import org.apache.plc4x.java.api.value.PlcValue;
import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionEvent;
import org.apache.plc4x.java.spi.messages.utils.PlcResponseItem;
import org.apache.plc4x.nifi.util.Plc4xCommon;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class Plc4xReadResponseRecordSet implements RecordSet, Closeable {
- private static final Logger logger =
LoggerFactory.getLogger(Plc4xReadResponseRecordSet.class);
+ private final ComponentLog logger;
private final PlcReadResponse readResponse;
private final Set<String> rsColumnNames;
private boolean moreRows;
- private final boolean debugEnabled = logger.isDebugEnabled();
private final String timestampFieldName;
private boolean isSubscription = false;
private Instant timestamp;
private final AtomicReference<RecordSchema> recordSchema = new
AtomicReference<>(null);
- public Plc4xReadResponseRecordSet(final PlcReadResponse readResponse,
RecordSchema recordSchema, String timestampFieldName) {
+ public Plc4xReadResponseRecordSet(final PlcReadResponse readResponse,
RecordSchema recordSchema, String timestampFieldName, ComponentLog logger) {
this.timestampFieldName = timestampFieldName;
this.readResponse = readResponse;
+ this.logger = logger;
+
if (!isSubscription) {
timestamp = Instant.now();
}
@@ -61,7 +61,7 @@ public class Plc4xReadResponseRecordSet implements RecordSet,
Closeable {
isSubscription = readResponse.getRequest() == null;
- if (debugEnabled)
+ if (this.logger.isDebugEnabled())
logger.debug("Creating record schema from PlcReadResponse");
Map<String, ? extends PlcValue> responseDataStructure;
@@ -78,7 +78,7 @@ public class Plc4xReadResponseRecordSet implements RecordSet,
Closeable {
} else {
this.recordSchema.set(recordSchema);
}
- if (debugEnabled)
+ if (this.logger.isDebugEnabled())
logger.debug("Record schema from PlcReadResponse successfuly
created.");
}
@@ -86,7 +86,7 @@ public class Plc4xReadResponseRecordSet implements RecordSet,
Closeable {
public Map<String, PlcValue> plc4xSubscriptionResponseRecordSet(final
DefaultPlcSubscriptionEvent subscriptionEvent) {
moreRows = true;
- if (debugEnabled)
+ if (logger.isDebugEnabled())
logger.debug("Creating record schema from
DefaultPlcSubscriptionEvent");
Map<String, PlcValue> responseDataStructure = new HashMap<>();
@@ -139,8 +139,8 @@ public class Plc4xReadResponseRecordSet implements
RecordSet, Closeable {
protected Record createRecord(final PlcReadResponse readResponse) {
final Map<String, Object> values = new
HashMap<>(getSchema().getFieldCount());
- if (debugEnabled)
- logger.debug("creating record.");
+ if (logger.isDebugEnabled())
+ logger.debug("Creating record from plc response");
for (final RecordField tag : getSchema().getFields()) {
final String tagName = tag.getFieldName();
@@ -158,7 +158,8 @@ public class Plc4xReadResponseRecordSet implements
RecordSet, Closeable {
value = null;
}
- logger.trace("Adding {} tag value to record.", tagName);
+ if (logger.isDebugEnabled())
+ logger.debug("Adding {} tag value to record.", tagName);
values.put(tagName, value);
}
@@ -169,8 +170,8 @@ public class Plc4xReadResponseRecordSet implements
RecordSet, Closeable {
values.put(timestampFieldName, timestamp.toEpochMilli());
}
- if (debugEnabled)
- logger.debug("added timestamp tag to record.");
+ if (logger.isDebugEnabled())
+ logger.debug("Adding timestamp tag {} to record.",
timestampFieldName);
return new MapRecord(getSchema(), values);
diff --git
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
index 302180b..ee22588 100644
---
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
+++
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
@@ -59,7 +59,7 @@ public class RecordPlc4xWriter implements Plc4xWriter {
RecordSchema recordSchema, String timestampFieldName) throws
Exception {
if (fullRecordSet == null) {
- fullRecordSet = new
Plc4xReadResponseRecordSetWithCallback(response, recordSchema,
timestampFieldName);
+ fullRecordSet = new
Plc4xReadResponseRecordSetWithCallback(response, recordSchema,
timestampFieldName, logger);
writeSchema = recordSetWriterFactory.getSchema(originalAttributes,
fullRecordSet.getSchema());
}
Map<String, String> empty = new HashMap<>();
@@ -79,7 +79,7 @@ public class RecordPlc4xWriter implements Plc4xWriter {
RecordSchema recordSchema, FlowFile originalFlowFile, String
timestampFieldName) throws Exception {
if (fullRecordSet == null) {
- fullRecordSet = new
Plc4xReadResponseRecordSetWithCallback(response, recordSchema,
timestampFieldName);
+ fullRecordSet = new
Plc4xReadResponseRecordSetWithCallback(response, recordSchema,
timestampFieldName, logger);
writeSchema = recordSetWriterFactory.getSchema(originalAttributes,
fullRecordSet.getSchema());
}
@@ -162,9 +162,9 @@ public class RecordPlc4xWriter implements Plc4xWriter {
private static class Plc4xReadResponseRecordSetWithCallback extends
Plc4xReadResponseRecordSet {
public Plc4xReadResponseRecordSetWithCallback(final PlcReadResponse
readResponse,
- RecordSchema recordSchema, String timestampFieldName) {
+ RecordSchema recordSchema, String timestampFieldName,
ComponentLog logger) {
- super(readResponse, recordSchema, timestampFieldName);
+ super(readResponse, recordSchema, timestampFieldName, logger);
}
@Override
diff --git
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
index d291d38..b007892 100644
---
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
+++
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
@@ -61,7 +61,8 @@ public class SchemaCache {
* @param schema record schema used for PlcResponse serialization. Can be
null
*/
public void addSchema(final Map<String,String> schemaIdentifier, final
Set<String> tagsNames, final List<? extends PlcTag> tagsList, final
RecordSchema schema) {
- if (!schemaMap.containsKey(schemaIdentifier.toString())){
+ final String identifier = schemaIdentifier.toString();
+ if (!schemaMap.containsKey(identifier)){
if (nextSchemaPosition.get() == cacheSize.get()){
nextSchemaPosition.set(0);
}
@@ -71,8 +72,8 @@ public class SchemaCache {
for (int i=0; i<tagsNames.size(); i++){
tags.put(tagsNames.toArray(new String[]{})[i],
tagsList.get(i));
}
- schemaMap.put(schemaIdentifier.toString(), new
SchemaContainer(tags, schema));
- schemaAppendOrder.set(nextSchemaPosition.get(),
schemaIdentifier.toString());
+ schemaMap.put(identifier, new SchemaContainer(tags, schema));
+ schemaAppendOrder.set(nextSchemaPosition.get(), identifier);
nextSchemaPosition.getAndAdd(1);
}
}
diff --git
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
index cac0b39..5dd43d5 100644
---
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
+++
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
@@ -79,6 +79,10 @@ public class Plc4xListenerDispatcher implements Runnable {
throw new PlcProtocolException("This connection does not support
subscription");
}
+ if (logger.isDebugEnabled()){
+ logger.debug("Creating PLC {} subscription for connection {} with
tags {}", subscriptionType, plcConnectionString, tags);
+ }
+
PlcSubscriptionRequest.Builder builder =
connection.subscriptionRequestBuilder();
for (Map.Entry<String, String> entry : tags.entrySet()) {
@@ -96,6 +100,9 @@ public class Plc4xListenerDispatcher implements Runnable {
PlcSubscriptionRequest subscriptionRequest = builder.build();
PlcSubscriptionResponse subscriptionResponse;
try {
+ if (logger.isDebugEnabled()){
+ logger.debug("Submitting PLC {} subscription for connection {}
with tags {}", subscriptionType, plcConnectionString, tags);
+ }
subscriptionResponse = subscriptionRequest.execute().get(timeout,
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
@@ -109,6 +116,10 @@ public class Plc4xListenerDispatcher implements Runnable {
throw (e instanceof ProcessException) ? (ProcessException) e : new
ProcessException(e);
}
+
+ if (logger.isDebugEnabled()){
+ logger.debug("Registering handlers for PLC {} subscription for
connection {} with tags {}", subscriptionType, plcConnectionString, tags);
+ }
for (PlcSubscriptionHandle handle :
subscriptionResponse.getSubscriptionHandles()) {
handle.register(queuedEvents::offer);
}
@@ -120,6 +131,9 @@ public class Plc4xListenerDispatcher implements Runnable {
* Closes all listeners and stops all handler threads.
*/
public void close() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Closing listener for ");
+ }
running = false;
try {
connection.close();
diff --git
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
index 7d3dac3..2d03e76 100644
---
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
+++
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
@@ -45,7 +45,6 @@ import
org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.plc4x.java.DefaultPlcDriverManager;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.PlcDriver;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
@@ -336,13 +335,12 @@ public abstract class BasePlc4xProcessor extends
AbstractProcessor {
protected static class Plc4xConnectionStringValidator implements Validator
{
@Override
public ValidationResult validate(String subject, String input,
ValidationContext context) {
- DefaultPlcDriverManager manager = new DefaultPlcDriverManager();
if (context.isExpressionLanguageSupported(subject) &&
context.isExpressionLanguagePresent(input)) {
return new
ValidationResult.Builder().subject(subject).input(input).explanation("Expression
Language Present").valid(true).build();
}
try {
- PlcDriver driver = manager.getDriverForUrl(input);
+ PlcDriver driver =
AddressesAccessUtils.getManager().getDriverForUrl(input);
driver.getConnection(input);
} catch (PlcConnectionException e) {
return new ValidationResult.Builder().subject(subject)
diff --git
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
index a0280de..afad369 100644
---
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
+++
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
@@ -67,7 +67,13 @@ public class Plc4xSourceProcessor extends BasePlc4xProcessor
{
final ComponentLog logger = getLogger();
final FlowFile flowFile = session.create();
- try(PlcConnection connection =
getConnectionManager().getConnection(getConnectionString(context,
incomingFlowFile))) {
+ final String connectionString = getConnectionString(context,
incomingFlowFile);
+
+ if (debugEnabled) {
+ logger.debug("Get connection for plc: {}", connectionString);
+ }
+
+ try(PlcConnection connection =
getConnectionManager().getConnection(connectionString)) {
if (!connection.getMetadata().isReadSupported()) {
throw new ProcessException("Reading not supported by
connection");
diff --git
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
index 33208ce..d1f2e4c 100644
---
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
+++
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
@@ -132,7 +132,13 @@ public class Plc4xSourceRecordProcessor extends
BasePlc4xProcessor {
PlcReadRequest readRequest;
long nrOfRowsHere;
- try (PlcConnection connection =
getConnectionManager().getConnection(getConnectionString(context,
originalFlowFile))) {
+ final String connectionString =
getConnectionString(context, originalFlowFile);
+
+ if (debugEnabled) {
+ logger.debug("Get connection for plc:
{}", connectionString);
+ }
+
+ try (PlcConnection connection =
getConnectionManager().getConnection(connectionString)) {
readRequest = getReadRequest(logger,
addressMap, tags, connection);
@@ -142,7 +148,7 @@ public class Plc4xSourceRecordProcessor extends
BasePlc4xProcessor {
} catch (TimeoutException e) {
logger.error("Timeout reading the data
from PLC", e);
-
getConnectionManager().removeCachedConnection(getConnectionString(context,
originalFlowFile));
+
getConnectionManager().removeCachedConnection(connectionString);
throw new ProcessException(e);
} catch (PlcConnectionException e) {
logger.error("Error getting the PLC
connection", e);
diff --git
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
index 33c6eaa..ccdc90b 100644
---
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
+++
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
@@ -36,24 +37,23 @@ import org.apache.plc4x.java.api.value.PlcValue;
import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionEvent;
import org.apache.plc4x.java.spi.messages.utils.PlcResponseItem;
import org.apache.plc4x.nifi.util.Plc4xCommon;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class Plc4xReadResponseRecordSet implements RecordSet, Closeable {
- private static final Logger logger =
LoggerFactory.getLogger(Plc4xReadResponseRecordSet.class);
+ private final ComponentLog logger;
private final PlcReadResponse readResponse;
private final Set<String> rsColumnNames;
private boolean moreRows;
- private final boolean debugEnabled = logger.isDebugEnabled();
private final String timestampFieldName;
private boolean isSubscription = false;
private Instant timestamp;
private final AtomicReference<RecordSchema> recordSchema = new
AtomicReference<>(null);
- public Plc4xReadResponseRecordSet(final PlcReadResponse readResponse,
RecordSchema recordSchema, String timestampFieldName) {
+ public Plc4xReadResponseRecordSet(final PlcReadResponse readResponse,
RecordSchema recordSchema, String timestampFieldName, ComponentLog logger) {
this.timestampFieldName = timestampFieldName;
this.readResponse = readResponse;
+ this.logger = logger;
+
if (!isSubscription) {
timestamp = Instant.now();
}
@@ -61,7 +61,7 @@ public class Plc4xReadResponseRecordSet implements RecordSet,
Closeable {
isSubscription = readResponse.getRequest() == null;
- if (debugEnabled)
+ if (this.logger.isDebugEnabled())
logger.debug("Creating record schema from PlcReadResponse");
Map<String, ? extends PlcValue> responseDataStructure;
@@ -78,7 +78,7 @@ public class Plc4xReadResponseRecordSet implements RecordSet,
Closeable {
} else {
this.recordSchema.set(recordSchema);
}
- if (debugEnabled)
+ if (this.logger.isDebugEnabled())
logger.debug("Record schema from PlcReadResponse successfuly
created.");
}
@@ -86,7 +86,7 @@ public class Plc4xReadResponseRecordSet implements RecordSet,
Closeable {
public Map<String, PlcValue> plc4xSubscriptionResponseRecordSet(final
DefaultPlcSubscriptionEvent subscriptionEvent) {
moreRows = true;
- if (debugEnabled)
+ if (logger.isDebugEnabled())
logger.debug("Creating record schema from
DefaultPlcSubscriptionEvent");
Map<String, PlcValue> responseDataStructure = new HashMap<>();
@@ -139,8 +139,8 @@ public class Plc4xReadResponseRecordSet implements
RecordSet, Closeable {
protected Record createRecord(final PlcReadResponse readResponse) {
final Map<String, Object> values = new
HashMap<>(getSchema().getFieldCount());
- if (debugEnabled)
- logger.debug("creating record.");
+ if (logger.isDebugEnabled())
+ logger.debug("Creating record from plc response");
for (final RecordField tag : getSchema().getFields()) {
final String tagName = tag.getFieldName();
@@ -158,7 +158,8 @@ public class Plc4xReadResponseRecordSet implements
RecordSet, Closeable {
value = null;
}
- logger.trace("Adding {} tag value to record.", tagName);
+ if (logger.isDebugEnabled())
+ logger.debug("Adding {} tag value to record.", tagName);
values.put(tagName, value);
}
@@ -169,8 +170,8 @@ public class Plc4xReadResponseRecordSet implements
RecordSet, Closeable {
values.put(timestampFieldName, timestamp.toEpochMilli());
}
- if (debugEnabled)
- logger.debug("added timestamp tag to record.");
+ if (logger.isDebugEnabled())
+ logger.debug("Adding timestamp tag {} to record.",
timestampFieldName);
return new MapRecord(getSchema(), values);
diff --git
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
index 302180b..ee22588 100644
---
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
+++
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
@@ -59,7 +59,7 @@ public class RecordPlc4xWriter implements Plc4xWriter {
RecordSchema recordSchema, String timestampFieldName) throws
Exception {
if (fullRecordSet == null) {
- fullRecordSet = new
Plc4xReadResponseRecordSetWithCallback(response, recordSchema,
timestampFieldName);
+ fullRecordSet = new
Plc4xReadResponseRecordSetWithCallback(response, recordSchema,
timestampFieldName, logger);
writeSchema = recordSetWriterFactory.getSchema(originalAttributes,
fullRecordSet.getSchema());
}
Map<String, String> empty = new HashMap<>();
@@ -79,7 +79,7 @@ public class RecordPlc4xWriter implements Plc4xWriter {
RecordSchema recordSchema, FlowFile originalFlowFile, String
timestampFieldName) throws Exception {
if (fullRecordSet == null) {
- fullRecordSet = new
Plc4xReadResponseRecordSetWithCallback(response, recordSchema,
timestampFieldName);
+ fullRecordSet = new
Plc4xReadResponseRecordSetWithCallback(response, recordSchema,
timestampFieldName, logger);
writeSchema = recordSetWriterFactory.getSchema(originalAttributes,
fullRecordSet.getSchema());
}
@@ -162,9 +162,9 @@ public class RecordPlc4xWriter implements Plc4xWriter {
private static class Plc4xReadResponseRecordSetWithCallback extends
Plc4xReadResponseRecordSet {
public Plc4xReadResponseRecordSetWithCallback(final PlcReadResponse
readResponse,
- RecordSchema recordSchema, String timestampFieldName) {
+ RecordSchema recordSchema, String timestampFieldName,
ComponentLog logger) {
- super(readResponse, recordSchema, timestampFieldName);
+ super(readResponse, recordSchema, timestampFieldName, logger);
}
@Override
diff --git
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
index d291d38..b007892 100644
---
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
+++
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
@@ -61,7 +61,8 @@ public class SchemaCache {
* @param schema record schema used for PlcResponse serialization. Can be
null
*/
public void addSchema(final Map<String,String> schemaIdentifier, final
Set<String> tagsNames, final List<? extends PlcTag> tagsList, final
RecordSchema schema) {
- if (!schemaMap.containsKey(schemaIdentifier.toString())){
+ final String identifier = schemaIdentifier.toString();
+ if (!schemaMap.containsKey(identifier)){
if (nextSchemaPosition.get() == cacheSize.get()){
nextSchemaPosition.set(0);
}
@@ -71,8 +72,8 @@ public class SchemaCache {
for (int i=0; i<tagsNames.size(); i++){
tags.put(tagsNames.toArray(new String[]{})[i],
tagsList.get(i));
}
- schemaMap.put(schemaIdentifier.toString(), new
SchemaContainer(tags, schema));
- schemaAppendOrder.set(nextSchemaPosition.get(),
schemaIdentifier.toString());
+ schemaMap.put(identifier, new SchemaContainer(tags, schema));
+ schemaAppendOrder.set(nextSchemaPosition.get(), identifier);
nextSchemaPosition.getAndAdd(1);
}
}
diff --git
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
index cac0b39..5dd43d5 100644
---
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
+++
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
@@ -79,6 +79,10 @@ public class Plc4xListenerDispatcher implements Runnable {
throw new PlcProtocolException("This connection does not support
subscription");
}
+ if (logger.isDebugEnabled()){
+ logger.debug("Creating PLC {} subscription for connection {} with
tags {}", subscriptionType, plcConnectionString, tags);
+ }
+
PlcSubscriptionRequest.Builder builder =
connection.subscriptionRequestBuilder();
for (Map.Entry<String, String> entry : tags.entrySet()) {
@@ -96,6 +100,9 @@ public class Plc4xListenerDispatcher implements Runnable {
PlcSubscriptionRequest subscriptionRequest = builder.build();
PlcSubscriptionResponse subscriptionResponse;
try {
+ if (logger.isDebugEnabled()){
+ logger.debug("Submitting PLC {} subscription for connection {}
with tags {}", subscriptionType, plcConnectionString, tags);
+ }
subscriptionResponse = subscriptionRequest.execute().get(timeout,
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
@@ -109,6 +116,10 @@ public class Plc4xListenerDispatcher implements Runnable {
throw (e instanceof ProcessException) ? (ProcessException) e : new
ProcessException(e);
}
+
+ if (logger.isDebugEnabled()){
+ logger.debug("Registering handlers for PLC {} subscription for
connection {} with tags {}", subscriptionType, plcConnectionString, tags);
+ }
for (PlcSubscriptionHandle handle :
subscriptionResponse.getSubscriptionHandles()) {
handle.register(queuedEvents::offer);
}
@@ -120,6 +131,9 @@ public class Plc4xListenerDispatcher implements Runnable {
* Closes all listeners and stops all handler threads.
*/
public void close() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Closing listener for ");
+ }
running = false;
try {
connection.close();