This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x-extras.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0ddb3da  feat(plc4j/nifi): improve logging (#334)
0ddb3da is described below

commit 0ddb3daa7645420ef949d42e67c2d09cc0a00fda
Author: Unai LerĂ­a Fortea <[email protected]>
AuthorDate: Mon Jun 2 19:30:01 2025 +0200

    feat(plc4j/nifi): improve logging (#334)
    
    * Add more debug logging on the component logger
    
    * Remove default plc driver manager creation on connection string validation
    
    * Complete merge after nifi 1 and 2 split
---
 .../org/apache/plc4x/nifi/BasePlc4xProcessor.java  |  4 +---
 .../apache/plc4x/nifi/Plc4xSourceProcessor.java    |  8 ++++++-
 .../plc4x/nifi/Plc4xSourceRecordProcessor.java     | 10 ++++++--
 .../nifi/record/Plc4xReadResponseRecordSet.java    | 27 +++++++++++-----------
 .../plc4x/nifi/record/RecordPlc4xWriter.java       |  8 +++----
 .../org/apache/plc4x/nifi/record/SchemaCache.java  |  7 +++---
 .../nifi/subscription/Plc4xListenerDispatcher.java | 14 +++++++++++
 .../org/apache/plc4x/nifi/BasePlc4xProcessor.java  |  4 +---
 .../apache/plc4x/nifi/Plc4xSourceProcessor.java    |  8 ++++++-
 .../plc4x/nifi/Plc4xSourceRecordProcessor.java     | 10 ++++++--
 .../nifi/record/Plc4xReadResponseRecordSet.java    | 27 +++++++++++-----------
 .../plc4x/nifi/record/RecordPlc4xWriter.java       |  8 +++----
 .../org/apache/plc4x/nifi/record/SchemaCache.java  |  7 +++---
 .../nifi/subscription/Plc4xListenerDispatcher.java | 14 +++++++++++
 14 files changed, 104 insertions(+), 52 deletions(-)

diff --git 
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
 
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
index 05d7cbb..0f55fea 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
@@ -45,7 +45,6 @@ import 
org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.plc4x.java.DefaultPlcDriverManager;
 import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.PlcDriver;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
@@ -336,13 +335,12 @@ public abstract class BasePlc4xProcessor extends 
AbstractProcessor {
     protected static class Plc4xConnectionStringValidator implements Validator 
{
         @Override
         public ValidationResult validate(String subject, String input, 
ValidationContext context) {
-            DefaultPlcDriverManager manager = new DefaultPlcDriverManager();
             
             if (context.isExpressionLanguageSupported(subject) && 
context.isExpressionLanguagePresent(input)) {
                 return new 
ValidationResult.Builder().subject(subject).input(input).explanation("Expression
 Language Present").valid(true).build();
             }
             try {
-                PlcDriver driver =  manager.getDriverForUrl(input);
+                PlcDriver driver =  
AddressesAccessUtils.getManager().getDriverForUrl(input);
                 driver.getConnection(input);
             } catch (PlcConnectionException e) {
                 return new ValidationResult.Builder().subject(subject)
diff --git 
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
 
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
index 9e02be0..bee1aa7 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
@@ -61,7 +61,13 @@ public class Plc4xSourceProcessor extends BasePlc4xProcessor 
{
         final ComponentLog logger = getLogger();
         final FlowFile flowFile = session.create();
     
-        try(PlcConnection connection = 
getConnectionManager().getConnection(getConnectionString(context, 
incomingFlowFile))) {
+        final String connectionString = getConnectionString(context, 
incomingFlowFile);
+
+        if (debugEnabled) {
+            logger.debug("Get connection for plc: {}", connectionString);
+        }
+
+        try(PlcConnection connection = 
getConnectionManager().getConnection(connectionString)) {
 
             if (!connection.getMetadata().isReadSupported()) {
                 throw new ProcessException("Reading not supported by 
connection");
diff --git 
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
 
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
index 33208ce..d1f2e4c 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
@@ -132,7 +132,13 @@ public class Plc4xSourceRecordProcessor extends 
BasePlc4xProcessor {
                                PlcReadRequest readRequest;
                                long nrOfRowsHere;
 
-                               try (PlcConnection connection = 
getConnectionManager().getConnection(getConnectionString(context, 
originalFlowFile))) {
+                               final String connectionString = 
getConnectionString(context, originalFlowFile);
+
+                               if (debugEnabled) {
+                                       logger.debug("Get connection for plc: 
{}", connectionString);
+                               }
+
+                               try (PlcConnection connection = 
getConnectionManager().getConnection(connectionString)) {
                                        
                                        readRequest =  getReadRequest(logger, 
addressMap, tags, connection);
                                        
@@ -142,7 +148,7 @@ public class Plc4xSourceRecordProcessor extends 
BasePlc4xProcessor {
 
                                } catch (TimeoutException e) {
                                        logger.error("Timeout reading the data 
from PLC", e);
-                                       
getConnectionManager().removeCachedConnection(getConnectionString(context, 
originalFlowFile));
+                                       
getConnectionManager().removeCachedConnection(connectionString);
                                        throw new ProcessException(e);
                                } catch (PlcConnectionException e) {
                                        logger.error("Error getting the PLC 
connection", e);
diff --git 
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
 
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
index 33c6eaa..ccdc90b 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
@@ -36,24 +37,23 @@ import org.apache.plc4x.java.api.value.PlcValue;
 import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionEvent;
 import org.apache.plc4x.java.spi.messages.utils.PlcResponseItem;
 import org.apache.plc4x.nifi.util.Plc4xCommon;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class Plc4xReadResponseRecordSet implements RecordSet, Closeable {
-    private static final Logger logger = 
LoggerFactory.getLogger(Plc4xReadResponseRecordSet.class);
+    private final ComponentLog logger;
     private final PlcReadResponse readResponse;
     private final Set<String> rsColumnNames;
     private boolean moreRows;
-    private final boolean debugEnabled = logger.isDebugEnabled();
     private final String timestampFieldName; 
     private boolean isSubscription = false;
     private Instant timestamp;
 
        private final AtomicReference<RecordSchema> recordSchema = new 
AtomicReference<>(null);
 
-    public Plc4xReadResponseRecordSet(final PlcReadResponse readResponse, 
RecordSchema recordSchema, String timestampFieldName) {
+    public Plc4xReadResponseRecordSet(final PlcReadResponse readResponse, 
RecordSchema recordSchema, String timestampFieldName, ComponentLog logger) {
         this.timestampFieldName = timestampFieldName;
         this.readResponse = readResponse;
+        this.logger = logger;
+
         if (!isSubscription) {
             timestamp = Instant.now();
         }
@@ -61,7 +61,7 @@ public class Plc4xReadResponseRecordSet implements RecordSet, 
Closeable {
         
         isSubscription = readResponse.getRequest() == null;
 
-        if (debugEnabled)
+        if (this.logger.isDebugEnabled())
             logger.debug("Creating record schema from PlcReadResponse");
         
         Map<String, ? extends PlcValue> responseDataStructure;
@@ -78,7 +78,7 @@ public class Plc4xReadResponseRecordSet implements RecordSet, 
Closeable {
         } else {
             this.recordSchema.set(recordSchema);
         }
-        if (debugEnabled)
+        if (this.logger.isDebugEnabled())
             logger.debug("Record schema from PlcReadResponse successfuly 
created.");
 
     }
@@ -86,7 +86,7 @@ public class Plc4xReadResponseRecordSet implements RecordSet, 
Closeable {
     public Map<String, PlcValue> plc4xSubscriptionResponseRecordSet(final 
DefaultPlcSubscriptionEvent subscriptionEvent) {
         moreRows = true;
         
-        if (debugEnabled)
+        if (logger.isDebugEnabled())
             logger.debug("Creating record schema from 
DefaultPlcSubscriptionEvent");
         
         Map<String, PlcValue> responseDataStructure = new HashMap<>();
@@ -139,8 +139,8 @@ public class Plc4xReadResponseRecordSet implements 
RecordSet, Closeable {
     protected Record createRecord(final PlcReadResponse readResponse) {
         final Map<String, Object> values = new 
HashMap<>(getSchema().getFieldCount());
 
-        if (debugEnabled)
-            logger.debug("creating record.");
+        if (logger.isDebugEnabled())
+            logger.debug("Creating record from plc response");
 
         for (final RecordField tag : getSchema().getFields()) {
             final String tagName = tag.getFieldName();
@@ -158,7 +158,8 @@ public class Plc4xReadResponseRecordSet implements 
RecordSet, Closeable {
                 value = null;
             }
             
-            logger.trace("Adding {} tag value to record.", tagName);
+            if (logger.isDebugEnabled())
+                logger.debug("Adding {} tag value to record.", tagName);
             values.put(tagName, value);
         }
 
@@ -169,8 +170,8 @@ public class Plc4xReadResponseRecordSet implements 
RecordSet, Closeable {
             values.put(timestampFieldName, timestamp.toEpochMilli());
         }
         
-        if (debugEnabled)
-            logger.debug("added timestamp tag to record.");
+        if (logger.isDebugEnabled())
+            logger.debug("Adding timestamp tag {} to record.", 
timestampFieldName);
 
                
         return new MapRecord(getSchema(), values);
diff --git 
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
 
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
index 302180b..ee22588 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
@@ -59,7 +59,7 @@ public class RecordPlc4xWriter implements Plc4xWriter {
                 RecordSchema recordSchema, String timestampFieldName) throws 
Exception {
         
         if (fullRecordSet == null) {
-            fullRecordSet = new 
Plc4xReadResponseRecordSetWithCallback(response, recordSchema, 
timestampFieldName);
+            fullRecordSet = new 
Plc4xReadResponseRecordSetWithCallback(response, recordSchema, 
timestampFieldName, logger);
             writeSchema = recordSetWriterFactory.getSchema(originalAttributes, 
fullRecordSet.getSchema());
         }
         Map<String, String> empty = new HashMap<>();
@@ -79,7 +79,7 @@ public class RecordPlc4xWriter implements Plc4xWriter {
             RecordSchema recordSchema, FlowFile originalFlowFile, String 
timestampFieldName) throws Exception {
         
         if (fullRecordSet == null) {
-            fullRecordSet = new 
Plc4xReadResponseRecordSetWithCallback(response, recordSchema, 
timestampFieldName);
+            fullRecordSet = new 
Plc4xReadResponseRecordSetWithCallback(response, recordSchema, 
timestampFieldName, logger);
             writeSchema = recordSetWriterFactory.getSchema(originalAttributes, 
fullRecordSet.getSchema());
         }
 
@@ -162,9 +162,9 @@ public class RecordPlc4xWriter implements Plc4xWriter {
     private static class Plc4xReadResponseRecordSetWithCallback extends 
Plc4xReadResponseRecordSet {
 
         public Plc4xReadResponseRecordSetWithCallback(final PlcReadResponse 
readResponse, 
-                RecordSchema recordSchema, String timestampFieldName) {
+                RecordSchema recordSchema, String timestampFieldName, 
ComponentLog logger) {
 
-            super(readResponse, recordSchema, timestampFieldName);
+            super(readResponse, recordSchema, timestampFieldName, logger);
         }
 
         @Override
diff --git 
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
 
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
index d291d38..b007892 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
@@ -61,7 +61,8 @@ public class SchemaCache {
      * @param schema record schema used for PlcResponse serialization. Can be 
null
      */
     public void addSchema(final Map<String,String> schemaIdentifier, final 
Set<String> tagsNames, final List<? extends PlcTag> tagsList,  final 
RecordSchema schema) {        
-        if (!schemaMap.containsKey(schemaIdentifier.toString())){
+        final String identifier = schemaIdentifier.toString();
+        if (!schemaMap.containsKey(identifier)){
             if (nextSchemaPosition.get() == cacheSize.get()){
                 nextSchemaPosition.set(0);
             }
@@ -71,8 +72,8 @@ public class SchemaCache {
             for (int i=0; i<tagsNames.size(); i++){
                 tags.put(tagsNames.toArray(new String[]{})[i], 
tagsList.get(i));
             }
-            schemaMap.put(schemaIdentifier.toString(), new 
SchemaContainer(tags, schema));
-            schemaAppendOrder.set(nextSchemaPosition.get(), 
schemaIdentifier.toString());
+            schemaMap.put(identifier, new SchemaContainer(tags, schema));
+            schemaAppendOrder.set(nextSchemaPosition.get(), identifier);
             nextSchemaPosition.getAndAdd(1);
         }    
     }
diff --git 
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
 
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
index cac0b39..5dd43d5 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-1/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
@@ -79,6 +79,10 @@ public class Plc4xListenerDispatcher implements Runnable {
             throw new PlcProtocolException("This connection does not support 
subscription");
         }
 
+        if (logger.isDebugEnabled()){
+            logger.debug("Creating PLC {} subscription for connection {} with 
tags {}", subscriptionType, plcConnectionString, tags);
+        }
+
         PlcSubscriptionRequest.Builder builder = 
connection.subscriptionRequestBuilder();
 
         for (Map.Entry<String, String> entry : tags.entrySet()) {
@@ -96,6 +100,9 @@ public class Plc4xListenerDispatcher implements Runnable {
         PlcSubscriptionRequest subscriptionRequest = builder.build();
         PlcSubscriptionResponse subscriptionResponse;
         try {
+            if (logger.isDebugEnabled()){
+                logger.debug("Submitting PLC {} subscription for connection {} 
with tags {}", subscriptionType, plcConnectionString, tags);
+            }
             subscriptionResponse = subscriptionRequest.execute().get(timeout, 
TimeUnit.MILLISECONDS);
             
         } catch (InterruptedException e) {
@@ -109,6 +116,10 @@ public class Plc4xListenerDispatcher implements Runnable {
             throw (e instanceof ProcessException) ? (ProcessException) e : new 
ProcessException(e);
         }
 
+
+        if (logger.isDebugEnabled()){
+            logger.debug("Registering handlers for PLC {} subscription for 
connection {} with tags {}", subscriptionType, plcConnectionString, tags);
+        }
         for (PlcSubscriptionHandle handle : 
subscriptionResponse.getSubscriptionHandles()) {
             handle.register(queuedEvents::offer);
         }
@@ -120,6 +131,9 @@ public class Plc4xListenerDispatcher implements Runnable {
      * Closes all listeners and stops all handler threads.
      */
     public void close() {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Closing listener for ");
+        }
         running = false;
         try {
             connection.close();
diff --git 
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
 
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
index 7d3dac3..2d03e76 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
@@ -45,7 +45,6 @@ import 
org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.plc4x.java.DefaultPlcDriverManager;
 import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.PlcDriver;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
@@ -336,13 +335,12 @@ public abstract class BasePlc4xProcessor extends 
AbstractProcessor {
     protected static class Plc4xConnectionStringValidator implements Validator 
{
         @Override
         public ValidationResult validate(String subject, String input, 
ValidationContext context) {
-            DefaultPlcDriverManager manager = new DefaultPlcDriverManager();
             
             if (context.isExpressionLanguageSupported(subject) && 
context.isExpressionLanguagePresent(input)) {
                 return new 
ValidationResult.Builder().subject(subject).input(input).explanation("Expression
 Language Present").valid(true).build();
             }
             try {
-                PlcDriver driver =  manager.getDriverForUrl(input);
+                PlcDriver driver =  
AddressesAccessUtils.getManager().getDriverForUrl(input);
                 driver.getConnection(input);
             } catch (PlcConnectionException e) {
                 return new ValidationResult.Builder().subject(subject)
diff --git 
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
 
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
index a0280de..afad369 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
@@ -67,7 +67,13 @@ public class Plc4xSourceProcessor extends BasePlc4xProcessor 
{
         final ComponentLog logger = getLogger();
         final FlowFile flowFile = session.create();
     
-        try(PlcConnection connection = 
getConnectionManager().getConnection(getConnectionString(context, 
incomingFlowFile))) {
+        final String connectionString = getConnectionString(context, 
incomingFlowFile);
+
+        if (debugEnabled) {
+            logger.debug("Get connection for plc: {}", connectionString);
+        }
+
+        try(PlcConnection connection = 
getConnectionManager().getConnection(connectionString)) {
 
             if (!connection.getMetadata().isReadSupported()) {
                 throw new ProcessException("Reading not supported by 
connection");
diff --git 
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
 
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
index 33208ce..d1f2e4c 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
@@ -132,7 +132,13 @@ public class Plc4xSourceRecordProcessor extends 
BasePlc4xProcessor {
                                PlcReadRequest readRequest;
                                long nrOfRowsHere;
 
-                               try (PlcConnection connection = 
getConnectionManager().getConnection(getConnectionString(context, 
originalFlowFile))) {
+                               final String connectionString = 
getConnectionString(context, originalFlowFile);
+
+                               if (debugEnabled) {
+                                       logger.debug("Get connection for plc: 
{}", connectionString);
+                               }
+
+                               try (PlcConnection connection = 
getConnectionManager().getConnection(connectionString)) {
                                        
                                        readRequest =  getReadRequest(logger, 
addressMap, tags, connection);
                                        
@@ -142,7 +148,7 @@ public class Plc4xSourceRecordProcessor extends 
BasePlc4xProcessor {
 
                                } catch (TimeoutException e) {
                                        logger.error("Timeout reading the data 
from PLC", e);
-                                       
getConnectionManager().removeCachedConnection(getConnectionString(context, 
originalFlowFile));
+                                       
getConnectionManager().removeCachedConnection(connectionString);
                                        throw new ProcessException(e);
                                } catch (PlcConnectionException e) {
                                        logger.error("Error getting the PLC 
connection", e);
diff --git 
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
 
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
index 33c6eaa..ccdc90b 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
@@ -36,24 +37,23 @@ import org.apache.plc4x.java.api.value.PlcValue;
 import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionEvent;
 import org.apache.plc4x.java.spi.messages.utils.PlcResponseItem;
 import org.apache.plc4x.nifi.util.Plc4xCommon;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class Plc4xReadResponseRecordSet implements RecordSet, Closeable {
-    private static final Logger logger = 
LoggerFactory.getLogger(Plc4xReadResponseRecordSet.class);
+    private final ComponentLog logger;
     private final PlcReadResponse readResponse;
     private final Set<String> rsColumnNames;
     private boolean moreRows;
-    private final boolean debugEnabled = logger.isDebugEnabled();
     private final String timestampFieldName; 
     private boolean isSubscription = false;
     private Instant timestamp;
 
        private final AtomicReference<RecordSchema> recordSchema = new 
AtomicReference<>(null);
 
-    public Plc4xReadResponseRecordSet(final PlcReadResponse readResponse, 
RecordSchema recordSchema, String timestampFieldName) {
+    public Plc4xReadResponseRecordSet(final PlcReadResponse readResponse, 
RecordSchema recordSchema, String timestampFieldName, ComponentLog logger) {
         this.timestampFieldName = timestampFieldName;
         this.readResponse = readResponse;
+        this.logger = logger;
+
         if (!isSubscription) {
             timestamp = Instant.now();
         }
@@ -61,7 +61,7 @@ public class Plc4xReadResponseRecordSet implements RecordSet, 
Closeable {
         
         isSubscription = readResponse.getRequest() == null;
 
-        if (debugEnabled)
+        if (this.logger.isDebugEnabled())
             logger.debug("Creating record schema from PlcReadResponse");
         
         Map<String, ? extends PlcValue> responseDataStructure;
@@ -78,7 +78,7 @@ public class Plc4xReadResponseRecordSet implements RecordSet, 
Closeable {
         } else {
             this.recordSchema.set(recordSchema);
         }
-        if (debugEnabled)
+        if (this.logger.isDebugEnabled())
             logger.debug("Record schema from PlcReadResponse successfuly 
created.");
 
     }
@@ -86,7 +86,7 @@ public class Plc4xReadResponseRecordSet implements RecordSet, 
Closeable {
     public Map<String, PlcValue> plc4xSubscriptionResponseRecordSet(final 
DefaultPlcSubscriptionEvent subscriptionEvent) {
         moreRows = true;
         
-        if (debugEnabled)
+        if (logger.isDebugEnabled())
             logger.debug("Creating record schema from 
DefaultPlcSubscriptionEvent");
         
         Map<String, PlcValue> responseDataStructure = new HashMap<>();
@@ -139,8 +139,8 @@ public class Plc4xReadResponseRecordSet implements 
RecordSet, Closeable {
     protected Record createRecord(final PlcReadResponse readResponse) {
         final Map<String, Object> values = new 
HashMap<>(getSchema().getFieldCount());
 
-        if (debugEnabled)
-            logger.debug("creating record.");
+        if (logger.isDebugEnabled())
+            logger.debug("Creating record from plc response");
 
         for (final RecordField tag : getSchema().getFields()) {
             final String tagName = tag.getFieldName();
@@ -158,7 +158,8 @@ public class Plc4xReadResponseRecordSet implements 
RecordSet, Closeable {
                 value = null;
             }
             
-            logger.trace("Adding {} tag value to record.", tagName);
+            if (logger.isDebugEnabled())
+                logger.debug("Adding {} tag value to record.", tagName);
             values.put(tagName, value);
         }
 
@@ -169,8 +170,8 @@ public class Plc4xReadResponseRecordSet implements 
RecordSet, Closeable {
             values.put(timestampFieldName, timestamp.toEpochMilli());
         }
         
-        if (debugEnabled)
-            logger.debug("added timestamp tag to record.");
+        if (logger.isDebugEnabled())
+            logger.debug("Adding timestamp tag {} to record.", 
timestampFieldName);
 
                
         return new MapRecord(getSchema(), values);
diff --git 
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
 
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
index 302180b..ee22588 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
@@ -59,7 +59,7 @@ public class RecordPlc4xWriter implements Plc4xWriter {
                 RecordSchema recordSchema, String timestampFieldName) throws 
Exception {
         
         if (fullRecordSet == null) {
-            fullRecordSet = new 
Plc4xReadResponseRecordSetWithCallback(response, recordSchema, 
timestampFieldName);
+            fullRecordSet = new 
Plc4xReadResponseRecordSetWithCallback(response, recordSchema, 
timestampFieldName, logger);
             writeSchema = recordSetWriterFactory.getSchema(originalAttributes, 
fullRecordSet.getSchema());
         }
         Map<String, String> empty = new HashMap<>();
@@ -79,7 +79,7 @@ public class RecordPlc4xWriter implements Plc4xWriter {
             RecordSchema recordSchema, FlowFile originalFlowFile, String 
timestampFieldName) throws Exception {
         
         if (fullRecordSet == null) {
-            fullRecordSet = new 
Plc4xReadResponseRecordSetWithCallback(response, recordSchema, 
timestampFieldName);
+            fullRecordSet = new 
Plc4xReadResponseRecordSetWithCallback(response, recordSchema, 
timestampFieldName, logger);
             writeSchema = recordSetWriterFactory.getSchema(originalAttributes, 
fullRecordSet.getSchema());
         }
 
@@ -162,9 +162,9 @@ public class RecordPlc4xWriter implements Plc4xWriter {
     private static class Plc4xReadResponseRecordSetWithCallback extends 
Plc4xReadResponseRecordSet {
 
         public Plc4xReadResponseRecordSetWithCallback(final PlcReadResponse 
readResponse, 
-                RecordSchema recordSchema, String timestampFieldName) {
+                RecordSchema recordSchema, String timestampFieldName, 
ComponentLog logger) {
 
-            super(readResponse, recordSchema, timestampFieldName);
+            super(readResponse, recordSchema, timestampFieldName, logger);
         }
 
         @Override
diff --git 
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
 
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
index d291d38..b007892 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
@@ -61,7 +61,8 @@ public class SchemaCache {
      * @param schema record schema used for PlcResponse serialization. Can be 
null
      */
     public void addSchema(final Map<String,String> schemaIdentifier, final 
Set<String> tagsNames, final List<? extends PlcTag> tagsList,  final 
RecordSchema schema) {        
-        if (!schemaMap.containsKey(schemaIdentifier.toString())){
+        final String identifier = schemaIdentifier.toString();
+        if (!schemaMap.containsKey(identifier)){
             if (nextSchemaPosition.get() == cacheSize.get()){
                 nextSchemaPosition.set(0);
             }
@@ -71,8 +72,8 @@ public class SchemaCache {
             for (int i=0; i<tagsNames.size(); i++){
                 tags.put(tagsNames.toArray(new String[]{})[i], 
tagsList.get(i));
             }
-            schemaMap.put(schemaIdentifier.toString(), new 
SchemaContainer(tags, schema));
-            schemaAppendOrder.set(nextSchemaPosition.get(), 
schemaIdentifier.toString());
+            schemaMap.put(identifier, new SchemaContainer(tags, schema));
+            schemaAppendOrder.set(nextSchemaPosition.get(), identifier);
             nextSchemaPosition.getAndAdd(1);
         }    
     }
diff --git 
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
 
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
index cac0b39..5dd43d5 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-2/nifi-2-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
@@ -79,6 +79,10 @@ public class Plc4xListenerDispatcher implements Runnable {
             throw new PlcProtocolException("This connection does not support 
subscription");
         }
 
+        if (logger.isDebugEnabled()){
+            logger.debug("Creating PLC {} subscription for connection {} with 
tags {}", subscriptionType, plcConnectionString, tags);
+        }
+
         PlcSubscriptionRequest.Builder builder = 
connection.subscriptionRequestBuilder();
 
         for (Map.Entry<String, String> entry : tags.entrySet()) {
@@ -96,6 +100,9 @@ public class Plc4xListenerDispatcher implements Runnable {
         PlcSubscriptionRequest subscriptionRequest = builder.build();
         PlcSubscriptionResponse subscriptionResponse;
         try {
+            if (logger.isDebugEnabled()){
+                logger.debug("Submitting PLC {} subscription for connection {} 
with tags {}", subscriptionType, plcConnectionString, tags);
+            }
             subscriptionResponse = subscriptionRequest.execute().get(timeout, 
TimeUnit.MILLISECONDS);
             
         } catch (InterruptedException e) {
@@ -109,6 +116,10 @@ public class Plc4xListenerDispatcher implements Runnable {
             throw (e instanceof ProcessException) ? (ProcessException) e : new 
ProcessException(e);
         }
 
+
+        if (logger.isDebugEnabled()){
+            logger.debug("Registering handlers for PLC {} subscription for 
connection {} with tags {}", subscriptionType, plcConnectionString, tags);
+        }
         for (PlcSubscriptionHandle handle : 
subscriptionResponse.getSubscriptionHandles()) {
             handle.register(queuedEvents::offer);
         }
@@ -120,6 +131,9 @@ public class Plc4xListenerDispatcher implements Runnable {
      * Closes all listeners and stops all handler threads.
      */
     public void close() {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Closing listener for ");
+        }
         running = false;
         try {
             connection.close();


Reply via email to