ottobackwards commented on a change in pull request #242:
URL: https://github.com/apache/plc4x/pull/242#discussion_r619312290
##########
File path: plc4j/integrations/apache-nifi/README.md
##########
@@ -0,0 +1,55 @@
+# PLC4X Apache NiFi Integration
+
+## Plc4xSinkProcessor
+
+## Plc4xSourceProcessor
+
+## Plc4xSourceRecordProcessor
+
+This processor is <ins>record oriented</ins>, formatting output flowfile
content using a Record Writer. An example of operation is included on the
following path:
+*./test-nifi-template/NIFI-PLC4XIntegration-record-example-1.12.xml*. This
file is a Nifi Template that could be directly imported from the NiFi UI to
test the operation.
+
Review comment:
Maybe a link to the Nifi Documentation
##########
File path: plc4j/integrations/apache-nifi/README.md
##########
@@ -0,0 +1,55 @@
+# PLC4X Apache NiFi Integration
+
+## Plc4xSinkProcessor
+
+## Plc4xSourceProcessor
+
+## Plc4xSourceRecordProcessor
+
+This processor is <ins>record oriented</ins>, formatting output flowfile
content using a Record Writer. An example of operation is included on the
following path:
+*./test-nifi-template/NIFI-PLC4XIntegration-record-example-1.12.xml*. This
file is a Nifi Template that could be directly imported from the NiFi UI to
test the operation.
+
+The Plc4xSourceRecord Processor can be configured using the following
**properties**:
+
+- *PLC connection String:* PLC4X connection string used to connect to a given
PLC device.
+- *PLC resource address String:* PLC4X address string used identify the
resource to read/write on a given PLC device (Multiple values supported). The
expected format is: {name}={address}(;{name}={address}*)
+- *Record Writer:* Specifies the Controller Service to use for writing results
to a FlowFile. The Record Writer may use Inherit Schema to emulate the inferred
schema behavior, i.e. an explicit schema need not be defined in the writer, and
will be supplied by the same logic used to infer the schema from the column
types.
+- *Force Reconnect every request:* Specifies if the connection to the PLC will
be recreated on every trigger event
+
+An *example* of these properties for reading values from a S7-1200:
+
+- *PLC connection String:*
*s7://10.105.143.7:102?remote-rack=0&remote-slot=1&controller-type=S7_1200*
+- *PLC resource address String:*
*var1=%DB1:DBX0.0:BOOL;var2=%DB1:DBX0.1:BOOL;var3=%DB1:DBB01:BYTE;var4=%DB1:DBW02:WORD;var5=%DB1:DBW04:INT*
+- *Record Writer:* *PLC4x Embedded - AvroRecordSetWriter*
+- *Force Reconnect every request:* *false*
+
Review comment:
So the discussion of the schema is important here.
Here is how the schema is generated by default.
This is the format of the data, if you chose to write your own schema etc.
Would you want to _explicitly_ recommend inheriting the schema?
##########
File path:
plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
##########
@@ -65,9 +65,14 @@ Licensed to the Apache Software Foundation (ASF) under one
@Override
protected void init(final ProcessorInitializationContext context) {
this.descriptors = Arrays.asList(PLC_CONNECTION_STRING,
PLC_ADDRESS_STRING);
- this.relationships = new HashSet<>(Arrays.asList(SUCCESS, FAILURE));
+ this.relationships = new HashSet<>(Arrays.asList(REL_SUCCESS,
REL_FAILURE));
}
+
Review comment:
These should be removed. All access to parameters should be through the
context
##########
File path:
plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
##########
@@ -0,0 +1,269 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.nifi;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.util.StopWatch;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
+import org.apache.plc4x.nifi.record.Plc4xWriter;
+import org.apache.plc4x.nifi.record.RecordPlc4xWriter;
+import org.apache.plc4x.nifi.util.PLC4X_PROTOCOL;
+import org.apache.plc4x.nifi.util.Plc4xCommon;
+
+@Tags({ "plc4x-source" })
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Processor able to read data from industrial PLCs using
Apache PLC4X")
+@WritesAttributes({ @WritesAttribute(attribute = "value", description = "some
value") })
+public class Plc4xSourceRecordProcessor extends BasePlc4xProcessor {
+
+ public static final String RESULT_ROW_COUNT = "plc4x.read.row.count";
+ public static final String RESULT_QUERY_DURATION =
"plc4x.read.query.duration";
+ public static final String RESULT_QUERY_EXECUTION_TIME =
"plc4x.read.query.executiontime";
+ public static final String RESULT_QUERY_FETCH_TIME =
"plc4x.read.query.fetchtime";
+ public static final String INPUT_FLOWFILE_UUID = "input.flowfile.uuid";
+ public static final String RESULT_ERROR_MESSAGE =
"plc4x.read.error.message";
+
+ public static final PropertyDescriptor RECORD_WRITER_FACTORY = new
PropertyDescriptor.Builder()
+ .name("plc4x-record-writer").displayName("Record
Writer")
+ .description("Specifies the Controller Service to use
for writing results to a FlowFile. The Record Writer may use Inherit Schema to
emulate the inferred schema behavior, i.e. "
+ + "an explicit schema
need not be defined in the writer, and will be supplied by the same logic used
to infer the schema from the column types.")
+
.identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
+
+ public static final PropertyDescriptor FORCE_RECONNECT = new
PropertyDescriptor.Builder()
+ .name("plc4x-reconnect-force").displayName("Force
Reconnect every request")
+ .description("Specifies if the connection to plc will
be recreated on trigger event")
+
.required(true).addValidator(StandardValidators.BOOLEAN_VALIDATOR).defaultValue("true").build();
+
+ public Plc4xSourceRecordProcessor() {
+
+ }
+
+ private PlcConnection connection = null;
+ private PooledPlcDriverManager pool;
+
+ private static PLC4X_PROTOCOL PROTOCOL = null;
+
+ @Override
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ super.onScheduled(context);
+ //TODO: Change this to use NiFi service instead of direct connection
and add @OnStopped Phase to close connection
+ try {
+ Boolean force =
context.getProperty(FORCE_RECONNECT).asBoolean();
+ pool = new PooledPlcDriverManager();
+ if(!force) {
+ this.connection =
pool.getConnection(getConnectionString());
+ }
+ //TODO how to infer protocol within the writer?
+ PROTOCOL =
Plc4xCommon.getConnectionProtocol(getConnectionString());
+ } catch (PlcConnectionException e) {
+ if(this.connection != null)
+ try {
+ this.connection.close();
+ } catch (Exception e1) {
+ //do nothing
+ }
+ getLogger().error("Error while creating the connection
to "+getConnectionString(), e);
+ }
+ }
+
+
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ super.init(context);
+ final Set<Relationship> r = new HashSet<>();
+ r.addAll(super.getRelationships());
+ this.relationships = Collections.unmodifiableSet(r);
+
+ final List<PropertyDescriptor> pds = new ArrayList<>();
+ pds.addAll(super.getSupportedPropertyDescriptors());
+ pds.add(RECORD_WRITER_FACTORY);
+ pds.add(FORCE_RECONNECT);
+ this.descriptors = Collections.unmodifiableList(pds);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ FlowFile fileToProcess = null;
+ //TODO: In the future the processor will be configurable to get
the address and the connection from incoming flowfile
Review comment:
This processor is set to INPUT FORBIDDEN, there should never be incoming
connection
##########
File path:
plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
##########
@@ -0,0 +1,269 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.nifi;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.util.StopWatch;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
+import org.apache.plc4x.nifi.record.Plc4xWriter;
+import org.apache.plc4x.nifi.record.RecordPlc4xWriter;
+import org.apache.plc4x.nifi.util.PLC4X_PROTOCOL;
+import org.apache.plc4x.nifi.util.Plc4xCommon;
+
+@Tags({ "plc4x-source" })
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Processor able to read data from industrial PLCs using
Apache PLC4X")
+@WritesAttributes({ @WritesAttribute(attribute = "value", description = "some
value") })
+public class Plc4xSourceRecordProcessor extends BasePlc4xProcessor {
+
+ public static final String RESULT_ROW_COUNT = "plc4x.read.row.count";
+ public static final String RESULT_QUERY_DURATION =
"plc4x.read.query.duration";
+ public static final String RESULT_QUERY_EXECUTION_TIME =
"plc4x.read.query.executiontime";
+ public static final String RESULT_QUERY_FETCH_TIME =
"plc4x.read.query.fetchtime";
+ public static final String INPUT_FLOWFILE_UUID = "input.flowfile.uuid";
+ public static final String RESULT_ERROR_MESSAGE =
"plc4x.read.error.message";
+
+ public static final PropertyDescriptor RECORD_WRITER_FACTORY = new
PropertyDescriptor.Builder()
+ .name("plc4x-record-writer").displayName("Record
Writer")
+ .description("Specifies the Controller Service to use
for writing results to a FlowFile. The Record Writer may use Inherit Schema to
emulate the inferred schema behavior, i.e. "
+ + "an explicit schema
need not be defined in the writer, and will be supplied by the same logic used
to infer the schema from the column types.")
+
.identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
+
+ public static final PropertyDescriptor FORCE_RECONNECT = new
PropertyDescriptor.Builder()
+ .name("plc4x-reconnect-force").displayName("Force
Reconnect every request")
+ .description("Specifies if the connection to plc will
be recreated on trigger event")
+
.required(true).addValidator(StandardValidators.BOOLEAN_VALIDATOR).defaultValue("true").build();
+
+ public Plc4xSourceRecordProcessor() {
+
+ }
+
+ private PlcConnection connection = null;
+ private PooledPlcDriverManager pool;
+
+ private static PLC4X_PROTOCOL PROTOCOL = null;
+
+ @Override
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ super.onScheduled(context);
+ //TODO: Change this to use NiFi service instead of direct connection
and add @OnStopped Phase to close connection
+ try {
+ Boolean force =
context.getProperty(FORCE_RECONNECT).asBoolean();
+ pool = new PooledPlcDriverManager();
+ if(!force) {
+ this.connection =
pool.getConnection(getConnectionString());
+ }
+ //TODO how to infer protocol within the writer?
+ PROTOCOL =
Plc4xCommon.getConnectionProtocol(getConnectionString());
+ } catch (PlcConnectionException e) {
+ if(this.connection != null)
+ try {
+ this.connection.close();
+ } catch (Exception e1) {
+ //do nothing
+ }
+ getLogger().error("Error while creating the connection
to "+getConnectionString(), e);
+ }
+ }
+
+
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ super.init(context);
+ final Set<Relationship> r = new HashSet<>();
+ r.addAll(super.getRelationships());
+ this.relationships = Collections.unmodifiableSet(r);
+
+ final List<PropertyDescriptor> pds = new ArrayList<>();
+ pds.addAll(super.getSupportedPropertyDescriptors());
+ pds.add(RECORD_WRITER_FACTORY);
+ pds.add(FORCE_RECONNECT);
+ this.descriptors = Collections.unmodifiableList(pds);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ FlowFile fileToProcess = null;
+ //TODO: In the future the processor will be configurable to get
the address and the connection from incoming flowfile
+ if (context.hasIncomingConnection()) {
+ fileToProcess = session.get();
+ // If we have no FlowFile, and all incoming connections
are self-loops then we can continue on.
+ // However, if we have no FlowFile and we have
connections coming from other
+ // Processors, then we know that we should run only if
we have a FlowFile.
+ if (fileToProcess == null &&
context.hasNonLoopConnection()) {
+ return;
+ }
+ }
+
+ final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
+
+ Plc4xWriter plc4xWriter = new
RecordPlc4xWriter(context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class),
fileToProcess == null ? Collections.emptyMap() :
fileToProcess.getAttributes());
+ Boolean force =
context.getProperty(FORCE_RECONNECT).asBoolean();
+ final ComponentLog logger = getLogger();
+ logger.info("Te connection {} will be recreated? (Force
Reconnect every request) is {}", new Object[] { getConnectionString(),
force});
+ // Get an instance of a component able to read from a PLC.
+ // TODO: Change this to use NiFi service instead of direct
connection
+ final AtomicLong nrOfRows = new AtomicLong(0L);
+ final StopWatch executeTime = new StopWatch(true);
+ try {
+ if(force) {
+ logger.debug("Recreating the connection {}
because the parameter (Force Reconnect every request) is {}", new
Object[] { getConnectionString(), force});
+ this.connection =
pool.getConnection(getConnectionString());
+ }
+ if(this.connection != null) {
+ // Prepare the request.
+ if (!connection.getMetadata().canRead()) {
+ throw new ProcessException("Reading not
supported by connection");
+ }
Review comment:
I'm confused about the input file here
##########
File path:
plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
##########
@@ -30,14 +30,14 @@ Licensed to the Apache Software Foundation (ASF) under one
public abstract class BasePlc4xProcessor extends AbstractProcessor {
Review comment:
I think we would want to support Expression Language here, with
parameter support. What if someone has a TEST lab and a Production setup, with
different connection strings and or addresses?
They may want to write a flow, and test it and deploy it with the nifi
registry and just replace the parameters to the prod devices
##########
File path: plc4j/integrations/apache-nifi/README.md
##########
@@ -0,0 +1,55 @@
+# PLC4X Apache NiFi Integration
+
+## Plc4xSinkProcessor
+
+## Plc4xSourceProcessor
+
+## Plc4xSourceRecordProcessor
+
+This processor is <ins>record oriented</ins>, formatting output flowfile
content using a Record Writer. An example of operation is included on the
following path:
+*./test-nifi-template/NIFI-PLC4XIntegration-record-example-1.12.xml*. This
file is a Nifi Template that could be directly imported from the NiFi UI to
test the operation.
+
+The Plc4xSourceRecord Processor can be configured using the following
**properties**:
+
Review comment:
This is an example where a base + protocol specific derived processors
would make thing better for the user, the documentation can be specific to the
protocol.
##########
File path:
plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
##########
@@ -30,14 +30,14 @@ Licensed to the Apache Software Foundation (ASF) under one
public abstract class BasePlc4xProcessor extends AbstractProcessor {
Review comment:
So, maybe we don't want to use PLC here. Just Connection String and
resource address. Not everything will be a PLC technically, and if derived
processors have to use these properties, it would really stick out.
@chrisdutz thoughts?
##########
File path:
plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
##########
@@ -0,0 +1,269 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.nifi;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.util.StopWatch;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
+import org.apache.plc4x.nifi.record.Plc4xWriter;
+import org.apache.plc4x.nifi.record.RecordPlc4xWriter;
+import org.apache.plc4x.nifi.util.PLC4X_PROTOCOL;
+import org.apache.plc4x.nifi.util.Plc4xCommon;
+
+@Tags({ "plc4x-source" })
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Processor able to read data from industrial PLCs using
Apache PLC4X")
+@WritesAttributes({ @WritesAttribute(attribute = "value", description = "some
value") })
+public class Plc4xSourceRecordProcessor extends BasePlc4xProcessor {
+
+ public static final String RESULT_ROW_COUNT = "plc4x.read.row.count";
+ public static final String RESULT_QUERY_DURATION =
"plc4x.read.query.duration";
+ public static final String RESULT_QUERY_EXECUTION_TIME =
"plc4x.read.query.executiontime";
+ public static final String RESULT_QUERY_FETCH_TIME =
"plc4x.read.query.fetchtime";
+ public static final String INPUT_FLOWFILE_UUID = "input.flowfile.uuid";
+ public static final String RESULT_ERROR_MESSAGE =
"plc4x.read.error.message";
+
+ public static final PropertyDescriptor RECORD_WRITER_FACTORY = new
PropertyDescriptor.Builder()
+ .name("plc4x-record-writer").displayName("Record
Writer")
+ .description("Specifies the Controller Service to use
for writing results to a FlowFile. The Record Writer may use Inherit Schema to
emulate the inferred schema behavior, i.e. "
+ + "an explicit schema
need not be defined in the writer, and will be supplied by the same logic used
to infer the schema from the column types.")
+
.identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
+
+ public static final PropertyDescriptor FORCE_RECONNECT = new
PropertyDescriptor.Builder()
+ .name("plc4x-reconnect-force").displayName("Force
Reconnect every request")
+ .description("Specifies if the connection to plc will
be recreated on trigger event")
+
.required(true).addValidator(StandardValidators.BOOLEAN_VALIDATOR).defaultValue("true").build();
+
+ public Plc4xSourceRecordProcessor() {
+
+ }
+
+ private PlcConnection connection = null;
+ private PooledPlcDriverManager pool;
+
+ private static PLC4X_PROTOCOL PROTOCOL = null;
+
+ @Override
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ super.onScheduled(context);
+ //TODO: Change this to use NiFi service instead of direct connection
and add @OnStopped Phase to close connection
+ try {
+ Boolean force =
context.getProperty(FORCE_RECONNECT).asBoolean();
+ pool = new PooledPlcDriverManager();
+ if(!force) {
Review comment:
access as properties
##########
File path:
plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
##########
@@ -0,0 +1,172 @@
+package org.apache.plc4x.nifi.record;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Array;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.nifi.util.PLC4X_DATA_TYPE;
+import org.apache.plc4x.nifi.util.PLC4X_PROTOCOL;
+import org.apache.plc4x.nifi.util.Plc4xCommon;
+import org.slf4j.LoggerFactory;
+
+
+public class Plc4xReadResponseRecordSet implements RecordSet, Closeable {
+ private static final Logger logger =
LoggerFactory.getLogger(Plc4xReadResponseRecordSet.class);
+ private final PlcReadResponse readResponse;
+ private final RecordSchema schema;
+ private final Set<String> rsColumnNames;
+ private boolean moreRows;
+
+
+ public Plc4xReadResponseRecordSet(final Map<String, String> plcAddressMap,
final PlcReadResponse readResponse, final RecordSchema readerSchema,
PLC4X_PROTOCOL PROTOCOL) throws IOException {
+ this.readResponse = readResponse;
+ moreRows = true;
+ this.schema = createSchema(plcAddressMap, readerSchema, true,
PROTOCOL);
+ rsColumnNames = plcAddressMap.keySet();
+
+ }
+
+ @Override
+ public RecordSchema getSchema() {
+ return schema;
+ }
+
+ // Protected methods for subclasses to access private member variables
+ protected PlcReadResponse getReadResponse() {
+ return readResponse;
+ }
+
+ protected boolean hasMoreRows() {
+ return moreRows;
+ }
+
+ protected void setMoreRows(boolean moreRows) {
+ this.moreRows = moreRows;
+ }
+
+ @Override
+ public Record next() throws IOException {
+ if (moreRows) {
+ final Record record = createRecord(readResponse);
+ setMoreRows(false);
+ return record;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void close() {
+ //do nothing
+ }
+
+ protected Record createRecord(final PlcReadResponse readResponse) throws
IOException{
+ final Map<String, Object> values = new
HashMap<>(schema.getFieldCount());
+
+ for (final RecordField field : schema.getFields()) {
+ final String fieldName = field.getFieldName();
+
+ final Object value;
+ if (rsColumnNames.contains(fieldName)) {
+ value = normalizeValue(readResponse.getObject(fieldName));
+ } else {
+ value = null;
+ }
+
+ values.put(fieldName, value);
+ }
+
+ //TODO add timestamp field to schema
+ values.put(Plc4xCommon.PLC4X_RECORD_TIMESTAMP_FIELD_NAME,
System.currentTimeMillis());
+
+ return new MapRecord(schema, values);
+ }
+
+ @SuppressWarnings("rawtypes")
+ private Object normalizeValue(final Object value) {
+ if (value == null) {
+ return null;
+ }
+ if (value instanceof List) {
+ return ((List) value).toArray();
+ }
+ return value;
+ }
+
Review comment:
As stated on the list, I think creating the schema once, when the first
data is retrieved *from* the data would be best
##########
File path:
plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
##########
@@ -0,0 +1,269 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.nifi;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.util.StopWatch;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
+import org.apache.plc4x.nifi.record.Plc4xWriter;
+import org.apache.plc4x.nifi.record.RecordPlc4xWriter;
+import org.apache.plc4x.nifi.util.PLC4X_PROTOCOL;
+import org.apache.plc4x.nifi.util.Plc4xCommon;
+
+@Tags({ "plc4x-source" })
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Processor able to read data from industrial PLCs using
Apache PLC4X")
+@WritesAttributes({ @WritesAttribute(attribute = "value", description = "some
value") })
+public class Plc4xSourceRecordProcessor extends BasePlc4xProcessor {
+
+ public static final String RESULT_ROW_COUNT = "plc4x.read.row.count";
+ public static final String RESULT_QUERY_DURATION =
"plc4x.read.query.duration";
+ public static final String RESULT_QUERY_EXECUTION_TIME =
"plc4x.read.query.executiontime";
+ public static final String RESULT_QUERY_FETCH_TIME =
"plc4x.read.query.fetchtime";
+ public static final String INPUT_FLOWFILE_UUID = "input.flowfile.uuid";
+ public static final String RESULT_ERROR_MESSAGE =
"plc4x.read.error.message";
+
+ public static final PropertyDescriptor RECORD_WRITER_FACTORY = new
PropertyDescriptor.Builder()
+ .name("plc4x-record-writer").displayName("Record
Writer")
+ .description("Specifies the Controller Service to use
for writing results to a FlowFile. The Record Writer may use Inherit Schema to
emulate the inferred schema behavior, i.e. "
+ + "an explicit schema
need not be defined in the writer, and will be supplied by the same logic used
to infer the schema from the column types.")
+
.identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
+
+ public static final PropertyDescriptor FORCE_RECONNECT = new
PropertyDescriptor.Builder()
+ .name("plc4x-reconnect-force").displayName("Force
Reconnect every request")
+ .description("Specifies if the connection to plc will
be recreated on trigger event")
+
.required(true).addValidator(StandardValidators.BOOLEAN_VALIDATOR).defaultValue("true").build();
+
+ public Plc4xSourceRecordProcessor() {
+
+ }
+
+ private PlcConnection connection = null;
+ private PooledPlcDriverManager pool;
+
+ private static PLC4X_PROTOCOL PROTOCOL = null;
+
+ @Override
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ super.onScheduled(context);
+ //TODO: Change this to use NiFi service instead of direct connection
and add @OnStopped Phase to close connection
+ try {
+ Boolean force =
context.getProperty(FORCE_RECONNECT).asBoolean();
+ pool = new PooledPlcDriverManager();
+ if(!force) {
+ this.connection =
pool.getConnection(getConnectionString());
+ }
+ //TODO how to infer protocol within the writer?
+ PROTOCOL =
Plc4xCommon.getConnectionProtocol(getConnectionString());
+ } catch (PlcConnectionException e) {
+ if(this.connection != null)
+ try {
+ this.connection.close();
+ } catch (Exception e1) {
+ //do nothing
+ }
+ getLogger().error("Error while creating the connection
to "+getConnectionString(), e);
+ }
+ }
+
+
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ super.init(context);
+ final Set<Relationship> r = new HashSet<>();
+ r.addAll(super.getRelationships());
+ this.relationships = Collections.unmodifiableSet(r);
+
+ final List<PropertyDescriptor> pds = new ArrayList<>();
+ pds.addAll(super.getSupportedPropertyDescriptors());
+ pds.add(RECORD_WRITER_FACTORY);
+ pds.add(FORCE_RECONNECT);
+ this.descriptors = Collections.unmodifiableList(pds);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ FlowFile fileToProcess = null;
+ //TODO: In the future the processor will be configurable to get
the address and the connection from incoming flowfile
+ if (context.hasIncomingConnection()) {
+ fileToProcess = session.get();
+ // If we have no FlowFile, and all incoming connections
are self-loops then we can continue on.
+ // However, if we have no FlowFile and we have
connections coming from other
+ // Processors, then we know that we should run only if
we have a FlowFile.
+ if (fileToProcess == null &&
context.hasNonLoopConnection()) {
+ return;
+ }
+ }
+
+ final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
+
+ Plc4xWriter plc4xWriter = new
RecordPlc4xWriter(context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class),
fileToProcess == null ? Collections.emptyMap() :
fileToProcess.getAttributes());
+ Boolean force =
context.getProperty(FORCE_RECONNECT).asBoolean();
+ final ComponentLog logger = getLogger();
+ logger.info("Te connection {} will be recreated? (Force
Reconnect every request) is {}", new Object[] { getConnectionString(),
force});
+ // Get an instance of a component able to read from a PLC.
+ // TODO: Change this to use NiFi service instead of direct
connection
+ final AtomicLong nrOfRows = new AtomicLong(0L);
+ final StopWatch executeTime = new StopWatch(true);
+ try {
+ if(force) {
+ logger.debug("Recreating the connection {}
because the parameter (Force Reconnect every request) is {}", new
Object[] { getConnectionString(), force});
+ this.connection =
pool.getConnection(getConnectionString());
+ }
+ if(this.connection != null) {
+ // Prepare the request.
+ if (!connection.getMetadata().canRead()) {
+ throw new ProcessException("Reading not
supported by connection");
+ }
+ String inputFileUUID = fileToProcess == null ?
null : fileToProcess.getAttribute(CoreAttributes.UUID.key());
+ Map<String, String> inputFileAttrMap =
fileToProcess == null ? null : fileToProcess.getAttributes();
+ FlowFile resultSetFF;
+ if (fileToProcess == null) {
+ resultSetFF = session.create();
+ } else {
+ resultSetFF =
session.create(fileToProcess);
+ }
+ if (inputFileAttrMap != null) {
+ resultSetFF =
session.putAllAttributes(resultSetFF, inputFileAttrMap);
+ }
+ try {
+ PlcReadRequest.Builder builder =
connection.readRequestBuilder();
+ getFields().forEach(field -> {
+ String address =
getAddress(field);
+ if (address != null) {
+ builder.addItem(field,
address);
+ }
+ });
+ PlcReadRequest readRequest =
builder.build();
+ PlcReadResponse readResponse =
readRequest.execute().get(10, TimeUnit.SECONDS);
+ resultSetFF =
session.write(resultSetFF, out -> {
+ try {
+
nrOfRows.set(plc4xWriter.writePlcReadResponse(readResponse,
this.getPlcAddress(), out, logger, null, PROTOCOL));
+ } catch (Exception e) {
+ throw (e instanceof
ProcessException) ? (ProcessException) e : new ProcessException(e);
+ }
+ });
+ long executionTimeElapsed =
executeTime.getElapsed(TimeUnit.MILLISECONDS);
+ final Map<String, String>
attributesToAdd = new HashMap<>();
+ attributesToAdd.put(RESULT_ROW_COUNT,
String.valueOf(nrOfRows.get()));
+
attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME,
String.valueOf(executionTimeElapsed));
+ if (inputFileUUID != null) {
+
attributesToAdd.put(INPUT_FLOWFILE_UUID, inputFileUUID);
+ }
+
attributesToAdd.putAll(plc4xWriter.getAttributesToAdd());
+ resultSetFF =
session.putAllAttributes(resultSetFF, attributesToAdd);
+ plc4xWriter.updateCounters(session);
+ logger.info("{} contains {} records;
transferring to 'success'",
+ new Object[] {
resultSetFF, nrOfRows.get() });
+ // Report a FETCH event if there was an
incoming flow file, or a RECEIVE event otherwise
+ if (context.hasIncomingConnection()) {
+
session.getProvenanceReporter().fetch(resultSetFF, "Retrieved " +
nrOfRows.get() + " rows", executionTimeElapsed);
+ } else {
+
session.getProvenanceReporter().receive(resultSetFF, "Retrieved " +
nrOfRows.get() + " rows", executionTimeElapsed);
+ }
+ resultSetFlowFiles.add(resultSetFF);
+ if (resultSetFlowFiles.size() >= 0) {
+
session.transfer(resultSetFlowFiles, super.REL_SUCCESS);
+ // Need to remove the original
input file if it exists
+ if (fileToProcess != null) {
+
session.remove(fileToProcess);
+ fileToProcess = null;
+ }
+ session.commit();
+ resultSetFlowFiles.clear();
+ }
+ } catch (Exception e) {
Review comment:
This logic around the try catches seems involved, perhaps a comment
explaining what you are doing and why would help maintainability
##########
File path:
plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
##########
@@ -0,0 +1,269 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.nifi;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.util.StopWatch;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
+import org.apache.plc4x.nifi.record.Plc4xWriter;
+import org.apache.plc4x.nifi.record.RecordPlc4xWriter;
+import org.apache.plc4x.nifi.util.PLC4X_PROTOCOL;
+import org.apache.plc4x.nifi.util.Plc4xCommon;
+
+@Tags({ "plc4x-source" })
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Processor able to read data from industrial PLCs using
Apache PLC4X")
+@WritesAttributes({ @WritesAttribute(attribute = "value", description = "some
value") })
+public class Plc4xSourceRecordProcessor extends BasePlc4xProcessor {
+
+ public static final String RESULT_ROW_COUNT = "plc4x.read.row.count";
+ public static final String RESULT_QUERY_DURATION =
"plc4x.read.query.duration";
+ public static final String RESULT_QUERY_EXECUTION_TIME =
"plc4x.read.query.executiontime";
+ public static final String RESULT_QUERY_FETCH_TIME =
"plc4x.read.query.fetchtime";
+ public static final String INPUT_FLOWFILE_UUID = "input.flowfile.uuid";
+ public static final String RESULT_ERROR_MESSAGE =
"plc4x.read.error.message";
+
+ public static final PropertyDescriptor RECORD_WRITER_FACTORY = new
PropertyDescriptor.Builder()
+ .name("plc4x-record-writer").displayName("Record
Writer")
+ .description("Specifies the Controller Service to use
for writing results to a FlowFile. The Record Writer may use Inherit Schema to
emulate the inferred schema behavior, i.e. "
+ + "an explicit schema
need not be defined in the writer, and will be supplied by the same logic used
to infer the schema from the column types.")
+
.identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
+
+ public static final PropertyDescriptor FORCE_RECONNECT = new
PropertyDescriptor.Builder()
+ .name("plc4x-reconnect-force").displayName("Force
Reconnect every request")
+ .description("Specifies if the connection to plc will
be recreated on trigger event")
+
.required(true).addValidator(StandardValidators.BOOLEAN_VALIDATOR).defaultValue("true").build();
+
+ public Plc4xSourceRecordProcessor() {
+
+ }
+
+ private PlcConnection connection = null;
+ private PooledPlcDriverManager pool;
+
+ private static PLC4X_PROTOCOL PROTOCOL = null;
+
+ @Override
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ super.onScheduled(context);
+ //TODO: Change this to use NiFi service instead of direct connection
and add @OnStopped Phase to close connection
+ try {
+ Boolean force =
context.getProperty(FORCE_RECONNECT).asBoolean();
+ pool = new PooledPlcDriverManager();
+ if(!force) {
+ this.connection =
pool.getConnection(getConnectionString());
+ }
+ //TODO how to infer protocol within the writer?
+ PROTOCOL =
Plc4xCommon.getConnectionProtocol(getConnectionString());
+ } catch (PlcConnectionException e) {
+ if(this.connection != null)
+ try {
+ this.connection.close();
+ } catch (Exception e1) {
+ //do nothing
+ }
+ getLogger().error("Error while creating the connection
to "+getConnectionString(), e);
+ }
+ }
+
+
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ super.init(context);
+ final Set<Relationship> r = new HashSet<>();
+ r.addAll(super.getRelationships());
+ this.relationships = Collections.unmodifiableSet(r);
+
+ final List<PropertyDescriptor> pds = new ArrayList<>();
+ pds.addAll(super.getSupportedPropertyDescriptors());
+ pds.add(RECORD_WRITER_FACTORY);
+ pds.add(FORCE_RECONNECT);
+ this.descriptors = Collections.unmodifiableList(pds);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ FlowFile fileToProcess = null;
+ //TODO: In the future the processor will be configurable to get
the address and the connection from incoming flowfile
+ if (context.hasIncomingConnection()) {
+ fileToProcess = session.get();
+ // If we have no FlowFile, and all incoming connections
are self-loops then we can continue on.
+ // However, if we have no FlowFile and we have
connections coming from other
+ // Processors, then we know that we should run only if
we have a FlowFile.
+ if (fileToProcess == null &&
context.hasNonLoopConnection()) {
+ return;
+ }
+ }
+
+ final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
+
+ Plc4xWriter plc4xWriter = new
RecordPlc4xWriter(context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class),
fileToProcess == null ? Collections.emptyMap() :
fileToProcess.getAttributes());
+ Boolean force =
context.getProperty(FORCE_RECONNECT).asBoolean();
+ final ComponentLog logger = getLogger();
+ logger.info("Te connection {} will be recreated? (Force
Reconnect every request) is {}", new Object[] { getConnectionString(),
force});
+ // Get an instance of a component able to read from a PLC.
+ // TODO: Change this to use NiFi service instead of direct
connection
+ final AtomicLong nrOfRows = new AtomicLong(0L);
+ final StopWatch executeTime = new StopWatch(true);
+ try {
+ if(force) {
+ logger.debug("Recreating the connection {}
because the parameter (Force Reconnect every request) is {}", new
Object[] { getConnectionString(), force});
+ this.connection =
pool.getConnection(getConnectionString());
+ }
+ if(this.connection != null) {
+ // Prepare the request.
+ if (!connection.getMetadata().canRead()) {
+ throw new ProcessException("Reading not
supported by connection");
+ }
+ String inputFileUUID = fileToProcess == null ?
null : fileToProcess.getAttribute(CoreAttributes.UUID.key());
+ Map<String, String> inputFileAttrMap =
fileToProcess == null ? null : fileToProcess.getAttributes();
+ FlowFile resultSetFF;
+ if (fileToProcess == null) {
+ resultSetFF = session.create();
+ } else {
+ resultSetFF =
session.create(fileToProcess);
+ }
+ if (inputFileAttrMap != null) {
+ resultSetFF =
session.putAllAttributes(resultSetFF, inputFileAttrMap);
+ }
+ try {
+ PlcReadRequest.Builder builder =
connection.readRequestBuilder();
+ getFields().forEach(field -> {
+ String address =
getAddress(field);
+ if (address != null) {
+ builder.addItem(field,
address);
+ }
+ });
+ PlcReadRequest readRequest =
builder.build();
+ PlcReadResponse readResponse =
readRequest.execute().get(10, TimeUnit.SECONDS);
+ resultSetFF =
session.write(resultSetFF, out -> {
+ try {
+
nrOfRows.set(plc4xWriter.writePlcReadResponse(readResponse,
this.getPlcAddress(), out, logger, null, PROTOCOL));
+ } catch (Exception e) {
+ throw (e instanceof
ProcessException) ? (ProcessException) e : new ProcessException(e);
+ }
+ });
+ long executionTimeElapsed =
executeTime.getElapsed(TimeUnit.MILLISECONDS);
+ final Map<String, String>
attributesToAdd = new HashMap<>();
+ attributesToAdd.put(RESULT_ROW_COUNT,
String.valueOf(nrOfRows.get()));
+
attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME,
String.valueOf(executionTimeElapsed));
+ if (inputFileUUID != null) {
+
attributesToAdd.put(INPUT_FLOWFILE_UUID, inputFileUUID);
+ }
+
attributesToAdd.putAll(plc4xWriter.getAttributesToAdd());
+ resultSetFF =
session.putAllAttributes(resultSetFF, attributesToAdd);
+ plc4xWriter.updateCounters(session);
+ logger.info("{} contains {} records;
transferring to 'success'",
+ new Object[] {
resultSetFF, nrOfRows.get() });
+ // Report a FETCH event if there was an
incoming flow file, or a RECEIVE event otherwise
+ if (context.hasIncomingConnection()) {
+
session.getProvenanceReporter().fetch(resultSetFF, "Retrieved " +
nrOfRows.get() + " rows", executionTimeElapsed);
+ } else {
+
session.getProvenanceReporter().receive(resultSetFF, "Retrieved " +
nrOfRows.get() + " rows", executionTimeElapsed);
+ }
+ resultSetFlowFiles.add(resultSetFF);
+ if (resultSetFlowFiles.size() >= 0) {
+
session.transfer(resultSetFlowFiles, super.REL_SUCCESS);
+ // Need to remove the original
input file if it exists
+ if (fileToProcess != null) {
+
session.remove(fileToProcess);
+ fileToProcess = null;
+ }
+ session.commit();
+ resultSetFlowFiles.clear();
+ }
+ } catch (Exception e) {
Review comment:
English please
##########
File path:
plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/processors/plc4x4nifi/SandBox.java
##########
@@ -0,0 +1,47 @@
+package org.apache.plc4x.processors.plc4x4nifi;
+
Review comment:
what is this?
##########
File path:
plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
##########
@@ -30,14 +30,14 @@ Licensed to the Apache Software Foundation (ASF) under one
public abstract class BasePlc4xProcessor extends AbstractProcessor {
Review comment:
Also, what if we want custom per derived validation?
If the connection strings are common to all drivers, and can all be
validated the same, this is OK, if not then this needs to be per processor
Same for the address
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]