This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch feature/s7strings
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/feature/s7strings by this push:
new 3c5266c490 fix: Major refactoring of mostly the write code.
3c5266c490 is described below
commit 3c5266c4901ce0882165e07f5b1d2d7ef01810c2
Author: Christofer Dutz <[email protected]>
AuthorDate: Thu Nov 30 19:37:27 2023 +0100
fix: Major refactoring of mostly the write code.
---
.../s7/readwrite/protocol/S7ProtocolLogic.java | 673 ++++++++++-----------
1 file changed, 324 insertions(+), 349 deletions(-)
diff --git
a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
index b450981b40..4b176256e1 100644
---
a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
+++
b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
@@ -22,7 +22,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
-import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.plc4x.java.api.exceptions.PlcInvalidTagException;
import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
@@ -102,16 +101,6 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
private final S7PlcSubscriptionHandle almHandle = new
S7PlcSubscriptionHandle(EventType.ALM, EventLogic);
//private final S7PlcSubscriptionHandle cycHandle = new
S7PlcSubscriptionHandle(EventType.CYC, EventLogic);
- /*
- * For the reconnection functionality by a "TimeOut" of the connection,
- * you must keep track of open transactions. In general, an S7 device
- * supports a couple of simultaneous requests.
- * The rhythm of execution must be determined by the TransactionManager.
- * So far it is the way to indicate to the user that he must redo
- * his request.
- */
- private final HashMap<CompletableFuture<S7Message>,
MutablePair<RequestTransactionManager.RequestTransaction,
CompletableFuture<PlcReadResponse>>> activeRequests = new HashMap<>();
-
/*
* This array stores the cyclic subscription requests between the driver
* and the PLC. The purpose is to document the tags associated with the
@@ -149,10 +138,6 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
return;
}
- // If the call is for a reconnection, we must clean up
- // the queued messages so that they release the transaction handler.
- cleanFutures();
-
//Set feature for all handlers in the pipeline from
//the driver configuration.
setChannelFeatures();
@@ -233,33 +218,28 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
logger.debug("Got S7 Identification Response");
S7PayloadUserData payloadUserData =
(S7PayloadUserData) messageUserData.getPayload();
extractControllerTypeAndFireConnected(context,
payloadUserData);
- cleanFutures();
});
});
});
}
-
/*
* It performs the sequential and safe shutdown of the driver.
* Completion of pending requests, executors and associated tasks.
*/
@Override
public void onDisconnect(ConversationContext<TPKTPacket> context) {
- // 1. Clear all pending requests and their associated transaction
- cleanFutures();
- // 2. Here we shut down the local task executor.
+ // 1. Here we shut down the local task executor.
clientExecutorService.shutdown();
- // 3. Performs the shutdown of the transaction executor.
+ // 2. Performs the shutdown of the transaction executor.
tm.shutdown();
- // 4. Finish the execution of the tasks for the handling of Events.
+ // 3. Finish the execution of the tasks for the handling of Events.
EventLogic.stop();
- // 5. Executes the closing of the main channel.
+ // 4. Executes the closing of the main channel.
context.getChannel().close();
- // 6. Here is the stop of any task or state machine that is added.
+ // 5. Here is the stop of any task or state machine that is added.
}
-
@Override
public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest)
{
// If we're not connected, just abort with an error.
@@ -270,61 +250,55 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
}
DefaultPlcReadRequest request = (DefaultPlcReadRequest) readRequest;
- S7Message s7Message;
+ CompletableFuture<S7Message> responseFuture;
if (request.getTagNames().stream().anyMatch(t -> request.getTag(t)
instanceof S7SzlTag)) {
S7SzlTag szlTag = (S7SzlTag) request.getTags().get(0);
// TODO: Is the tpduReference of 1 correct here?
- s7Message = new S7MessageUserData(1, new
S7ParameterUserData(List.of(
- new S7ParameterUserDataItemCPUFunctions((short) 0x11,
(byte) 0x4, (byte) 0x4, (short) 0x01, (short) 0x00, null, null, null)
- )), new S7PayloadUserData(List.of(
+ S7Message s7Message = new S7MessageUserData(getTpduId(),
+ new S7ParameterUserData(List.of(
+ new S7ParameterUserDataItemCPUFunctions(
+ (short) 0x11, (byte) 0x4, (byte) 0x4, (short) 0x01,
(short) 0x00,
+ null, null, null)
+ )),
+ new S7PayloadUserData(List.of(
new
S7PayloadUserDataItemCpuFunctionReadSzlRequest(DataTransportErrorCode.OK,
- DataTransportSize.OCTET_STRING,
- 0x04,
- new SzlId(SzlModuleTypeClass.enumForValue((byte)
((szlTag.getSzlId() & 0xf000) >> 12)),
- (byte) ((szlTag.getSzlId() & 0x0f00) >> 8),
- SzlSublist.enumForValue((short)
(szlTag.getSzlId() & 0x00ff))),
+ DataTransportSize.OCTET_STRING,
+ 0x04,
+ new SzlId(SzlModuleTypeClass.enumForValue((byte)
((szlTag.getSzlId() & 0xf000) >> 12)),
+ (byte) ((szlTag.getSzlId() & 0x0f00) >> 8),
+ SzlSublist.enumForValue((short) (szlTag.getSzlId()
& 0x00ff))),
szlTag.getIndex())
- )));
- }
-
- else if (request.getTagNames().stream().anyMatch(t ->
request.getTag(t) instanceof S7AckTag)) {
- s7Message = encodeAlarmAckRequest(request);
- }
-
- else if (request.getTagNames().stream().anyMatch(t ->
request.getTag(t) instanceof S7ClkTag)) {
- s7Message = encodePlcClkRequest(request);
+ )));
+ responseFuture = sendInternal(s7Message);
+ } else if (request.getTagNames().stream().anyMatch(t ->
request.getTag(t) instanceof S7AckTag)) {
+ responseFuture = performAlarmAckRequest(request);
+ } else if (request.getTagNames().stream().anyMatch(t ->
request.getTag(t) instanceof S7ClkTag)) {
+ responseFuture = performClkRequest(request);
}
// If the request contains at least one var-length string field, we
need to get the real length first.
else if (request.getTagNames().stream().anyMatch(t ->
request.getTag(t) instanceof S7StringVarLengthTag)) {
- s7Message = encodePlcVarLengthStringReadRequest(request);
+ responseFuture = performVarLengthStringReadRequest(request);
}
// This is a "normal" read request.
else {
- s7Message = encodePlcReadRequest(request);
+ responseFuture = performOrdinaryReadRequest(request);
}
// Just send a single response and chain it as Response
- return toPlcReadResponse(readRequest, readInternal(s7Message));
+ return toPlcReadResponse(readRequest, responseFuture);
}
/**
* Maps the S7ReadResponse of a PlcReadRequest to a PlcReadResponse
*/
- private CompletableFuture<PlcReadResponse>
toPlcReadResponse(PlcReadRequest readRequest, CompletableFuture<S7Message>
response) {
+ private CompletableFuture<PlcReadResponse>
toPlcReadResponse(PlcReadRequest readRequest, CompletableFuture<S7Message>
responseFuture) {
CompletableFuture<PlcReadResponse> clientFuture = new
CompletableFuture<>();
- activeRequests.get(response).setRight(clientFuture);
try {
- clientExecutorService.execute(() -> {
- try {
- PlcReadResponse plcItems = (PlcReadResponse)
decodeReadResponse(response.get(), readRequest);
- clientFuture.complete(plcItems);
- } catch (Exception ex) {
- logger.info(ex.toString());
- }
- });
+ PlcReadResponse response = (PlcReadResponse)
decodeReadResponse(responseFuture.get(), readRequest);
+ clientFuture.complete(response);
} catch (Exception ex) {
logger.info(ex.toString());
}
@@ -332,88 +306,47 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
return clientFuture;
}
- /**
- * Sends one Read over the Wire and internally returns the Response
- * Do sending of normally sized single-message request.
- * <p>
- * Assumes that the {@link S7MessageRequest} and its expected {@link
S7MessageResponseData}
- * and does not further check that!
- */
- private CompletableFuture<S7Message> readInternal(S7Message request) {
- CompletableFuture<S7Message> future = new CompletableFuture<>();
-
- int tpduId = request.getTpduReference();
-
- TPKTPacket tpktPacket = new TPKTPacket(new COTPPacketData(null,
request, true, (byte) tpduId));
-
- // Start a new request-transaction (Is ended in the response-handler)
-
- RequestTransactionManager.RequestTransaction transaction =
tm.startRequest();
-
- transaction.submit(() -> context.sendRequest(tpktPacket)
- .onTimeout(new TransactionErrorCallback<>(future, transaction))
- .onError(new TransactionErrorCallback<>(future, transaction))
- .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
- .check(p -> p.getPayload() instanceof COTPPacketData)
- .unwrap(p -> (COTPPacketData) p.getPayload())
- .check(p -> p.getPayload() != null)
- .unwrap(COTPPacket::getPayload)
- .check(p -> p.getTpduReference() == tpduId)
- .handle(p -> {
- activeRequests.remove(future);
- future.complete(p);
- // Finish the request-transaction.
- transaction.endRequest();
- }));
- activeRequests.put(future, new MutablePair<>(transaction, null));
-
- return future;
- }
-
@Override
public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest
writeRequest) {
- CompletableFuture<PlcWriteResponse> future = new CompletableFuture<>();
- DefaultPlcWriteRequest request = (DefaultPlcWriteRequest) writeRequest;
-
- int tpduId = getTpduId();
+ // If we're not connected, just abort with an error.
+ if (!isConnected()) {
+ CompletableFuture<PlcWriteResponse> future = new
CompletableFuture<>();
+ future.completeExceptionally(new
PlcRuntimeException("Disconnected"));
+ return future;
+ }
- TPKTPacket tpktPacket;
+ DefaultPlcWriteRequest request = (DefaultPlcWriteRequest) writeRequest;
+ CompletableFuture<S7Message> responseFuture = new
CompletableFuture<>();
// TODO: Write one or two lines on what happens here ... to me it
looks as if there's at least on S7ClkTag, then all is handled by the writeClk
method, but what happens if a request would contain mixed tag types?
if (request.getTagNames().stream().anyMatch(t -> request.getTag(t)
instanceof S7ClkTag)) {
- tpktPacket = encodePlcClkSetRequest((DefaultPlcWriteRequest)
writeRequest, tpduId);
+ responseFuture = performClkSetRequest((DefaultPlcWriteRequest)
writeRequest);
}
// If the list of tags contains at least one STRING/WSTRING element,
// we need to check the sizes of the string fields in a first request.
else if (request.getTagNames().stream().anyMatch(t ->
request.getTag(t) instanceof S7StringVarLengthTag)) {
- tpktPacket = encodePlcStringWriteRequest((DefaultPlcWriteRequest)
writeRequest, tpduId);
- }
+ responseFuture =
performVarLengthStringWriteRequest((DefaultPlcWriteRequest) writeRequest);
+ }
// This is a request only containing ordinary tags
else {
- tpktPacket = encodeOrdinaryWriteRequest(request, tpduId);
+ responseFuture = performOrdinaryWriteRequest(request);
}
- // Start a new request-transaction (Is ended in the response-handler)
- RequestTransactionManager.RequestTransaction transaction =
tm.startRequest();
- transaction.submit(() -> context.sendRequest(tpktPacket)
- .onTimeout(new TransactionErrorCallback<>(future, transaction))
- .onError(new TransactionErrorCallback<>(future, transaction))
- .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
- .check(p -> p.getPayload() instanceof COTPPacketData)
- .unwrap(p -> ((COTPPacketData) p.getPayload()))
- .unwrap(COTPPacket::getPayload)
- .check(p -> p.getTpduReference() == tpduId)
- .handle(p -> {
- try {
- future.complete(((PlcWriteResponse) decodeWriteResponse(p,
writeRequest)));
- } catch (PlcProtocolException e) {
- logger.warn("Error sending 'write' message: '{}'",
e.getMessage(), e);
- }
- // Finish the request-transaction.
- transaction.endRequest();
- }));
- return future;
+ return toPlcWriteResponse(writeRequest, responseFuture);
+ }
+
+ private CompletableFuture<PlcWriteResponse>
toPlcWriteResponse(PlcWriteRequest writeRequest, CompletableFuture<S7Message>
responseFuture) {
+ CompletableFuture<PlcWriteResponse> clientFuture = new
CompletableFuture<>();
+
+ try {
+ PlcWriteResponse response = (PlcWriteResponse)
decodeWriteResponse(responseFuture.get(), writeRequest);
+ clientFuture.complete(response);
+ } catch (Exception ex) {
+ logger.info(ex.toString());
+ }
+
+ return clientFuture;
}
@Override
@@ -473,11 +406,11 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
break;
default:
}
- if(s7Message == null) {
+ if (s7Message == null) {
throw new PlcInvalidTagException("Unsupported tag of type: " +
tag.getTagType());
}
- TPKTPacket tpktPacket = new TPKTPacket(new COTPPacketData(null,
s7Message,true, (byte) tpduId));
+ TPKTPacket tpktPacket = new TPKTPacket(new COTPPacketData(null,
s7Message, true, (byte) tpduId));
// Start a new request-transaction (Is ended in the
response-handler)
RequestTransactionManager.RequestTransaction transaction =
tm.startRequest();
@@ -654,8 +587,8 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
}
private PlcSubscriptionResponse decodeEventSubscriptionResponse(String
strTagName,
-
PlcSubscriptionRequest plcSubscriptionRequest,
- S7Message
responseMessage)
+
PlcSubscriptionRequest plcSubscriptionRequest,
+ S7Message
responseMessage)
throws PlcProtocolException {
Map<String, ResponseItem<PlcSubscriptionHandle>> values = new
HashMap<>();
@@ -918,7 +851,7 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
return null;
}
- private S7Message encodeAlarmAckRequest(DefaultPlcReadRequest request) {
+ private CompletableFuture<S7Message>
performAlarmAckRequest(DefaultPlcReadRequest request) {
List<S7ParameterUserDataItem> parameterItems = new
ArrayList<>(request.getNumberOfTags());
List<S7PayloadUserDataItem> payloadItems = new
ArrayList<>(request.getNumberOfTags());
@@ -967,9 +900,9 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
messageObjects);
payloadItems.add(payload);
- return new S7MessageUserData(getTpduId(),
+ return sendInternal(new S7MessageUserData(getTpduId(),
new S7ParameterUserData(parameterItems),
- new S7PayloadUserData(payloadItems));
+ new S7PayloadUserData(payloadItems)));
}
private S7Message encodeAlarmQueryRequest(DefaultPlcSubscriptionRequest
request, int tpduId) {
@@ -988,7 +921,7 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
);
parameterItems.add(parameter);
- //TODO: Chequear el tipo dfe larma.
+ //TODO: Check the type of alarm
S7PayloadUserDataItemCpuFunctionAlarmQueryRequest payload =
new S7PayloadUserDataItemCpuFunctionAlarmQueryRequest(
DataTransportErrorCode.OK,
@@ -1210,9 +1143,9 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
}
/*
- *
- */
- private S7Message encodePlcClkRequest(DefaultPlcReadRequest request) {
+ *
+ */
+ private CompletableFuture<S7Message>
performClkRequest(DefaultPlcReadRequest request) {
List<S7ParameterUserDataItem> parameterItems = new
ArrayList<>(request.getNumberOfTags());
List<S7PayloadUserDataItem> payloadItems = new
ArrayList<>(request.getNumberOfTags());
@@ -1238,15 +1171,15 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
0x00);
payloadItems.add(payload);
- return new S7MessageUserData(getTpduId(),
+ return sendInternal(new S7MessageUserData(getTpduId(),
new S7ParameterUserData(parameterItems),
- new S7PayloadUserData(payloadItems));
+ new S7PayloadUserData(payloadItems)));
}
/*
- *
- */
- private TPKTPacket encodePlcClkSetRequest(DefaultPlcWriteRequest request,
int tpduId) {
+ *
+ */
+ private CompletableFuture<S7Message>
performClkSetRequest(DefaultPlcWriteRequest request) {
List<S7ParameterUserDataItem> parameterItems = new
ArrayList<>(request.getNumberOfTags());
List<S7PayloadUserDataItem> payloadItems = new
ArrayList<>(request.getNumberOfTags());
@@ -1272,15 +1205,17 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
tag.getDateAndTime());
payloadItems.add(payload);
- return new TPKTPacket(new COTPPacketData(null,
- new S7MessageUserData(tpduId,
+ return sendInternal(
+ new S7MessageUserData(getTpduId(),
new S7ParameterUserData(parameterItems),
- new S7PayloadUserData(payloadItems)),
- true, (byte) tpduId));
+ new S7PayloadUserData(payloadItems)));
}
/*
+ * Encoding of STRING types ... for WSTRING types the "Maximum length" and
+ * "Current length" are both 16 bit unsigned integer values:
+ *
* +-------------------+
* Byte n | Maximum length | (intMaxChars)
* +-------------------+
@@ -1298,114 +1233,59 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
* +-------------------+ |
* Byte ... | ... | /
* +-------------------+
+ *
* Reading text strings in two steps:
- * 1. For the operation on texts, you must first evaluate the
+ * 1. For the operation on texts, you must first evaluate the
* space created on DB of type STRING or WSTRING.
- * 2. The first two bytes have the maximum number of characters (bytes)
- * available to store text strings (intMaxChars) and the number of
+ * 2. The first two bytes have the maximum number of characters (bytes)
+ * available to store text strings (intMaxChars) and the number of
* characters available (intActualChars).
- * 3. In the specific case of reading, only the characters defined
+ * 3. In the specific case of reading, only the characters defined
* by "intActualChars" are recovered.
- * TODO: Maximum waiting time managed by system variables.
- */
- private S7Message
encodePlcVarLengthStringReadRequest(DefaultPlcReadRequest request) {
- List<S7ParameterReadVarRequest> parameterItems = new
ArrayList<>(request.getNumberOfTags());
+ */
+ private CompletableFuture<S7Message>
performVarLengthStringReadRequest(DefaultPlcReadRequest request) {
+ CompletableFuture<S7Message> future = new CompletableFuture<>();
- // Build a request to read the length information for every var-length
string in the request.
- List<S7StringVarLengthTag> varLengthStringTags =
request.getTags().stream()
- .filter(plcTag -> plcTag instanceof S7StringVarLengthTag)
- .map(plcTag -> (S7StringVarLengthTag) plcTag)
- .collect(Collectors.toList());
- List<S7VarRequestParameterItem> stringFields = new
ArrayList<>(varLengthStringTags.size());
- for (S7StringVarLengthTag varLengthStringTag : varLengthStringTags) {
- // For STRING, the header is 2 bytes (first byte contains the max
length and the second the actual length)
- if(varLengthStringTag.getDataType() == TransportSize.STRING) {
- stringFields.add(new S7VarRequestParameterItemAddress(
- new S7AddressAny(
- TransportSize.BYTE,
- 2,
- varLengthStringTag.getBlockNumber(),
- MemoryArea.DATA_BLOCKS,
- varLengthStringTag.getByteOffset(),
- varLengthStringTag.getBitOffset()
- )));
- }
- // For WSTRING, the header is 4 bytes (first word contains the max
length and the second the actual length)
- else if(varLengthStringTag.getDataType() == TransportSize.WSTRING)
{
- stringFields.add(new S7VarRequestParameterItemAddress(
- new S7AddressAny(
- TransportSize.BYTE,
- 4,
- varLengthStringTag.getBlockNumber(),
- MemoryArea.DATA_BLOCKS,
- varLengthStringTag.getByteOffset(),
- varLengthStringTag.getBitOffset()
- )));
+ // Resolve the lengths of all var-length string fields in the request.
+ CompletableFuture<Map<S7StringVarLengthTag, StringSizes>>
stringSizesFuture = getStringSizes(request);
+ stringSizesFuture.whenComplete((s7StringVarLengthTagStringSizesMap,
throwable) -> {
+ if (throwable != null) {
+ future.completeExceptionally(new PlcProtocolException("Error
resolving string sizes", throwable));
} else {
- throw new PlcInvalidTagException("Only STRING and WSTRING
allowed here.");
- }
- }
- final S7MessageRequest readRequest = new S7MessageRequest(
- getTpduId(), new S7ParameterReadVarRequest(stringFields), null);
-
- // Read the max length and actual size for each of the var-length
strings.
- CompletableFuture<S7Message> future = readInternal(readRequest);
-
- Map<S7StringVarLengthTag, Integer> stringLengths = new
HashMap<>(varLengthStringTags.size());
- try {
- // TODO: Check if we're not blocking the execute method here ...
- S7Message getLengthsResponseMessage = future.get(2000,
TimeUnit.MILLISECONDS);
- S7PayloadReadVarResponse getLengthsResponse =
(S7PayloadReadVarResponse) getLengthsResponseMessage.getPayload();
- int curItemIndex = 0;
- for (S7StringVarLengthTag varLengthStringTag :
varLengthStringTags) {
- S7VarPayloadDataItem s7VarPayloadDataItem =
getLengthsResponse.getItems().get(curItemIndex);
- ReadBufferByteBased readBuffer = new
ReadBufferByteBased(s7VarPayloadDataItem.getData());
- try {
- if (varLengthStringTag.getDataType() ==
TransportSize.STRING) {
- /*int maxChars =
*/readBuffer.readUnsignedInt("maxLength", 8);
- int actualChars =
readBuffer.readUnsignedInt("maxLength", 8);
- stringLengths.put(varLengthStringTag, actualChars);
- } else if (varLengthStringTag.getDataType() ==
TransportSize.WSTRING) {
- /*int maxChars =
*/readBuffer.readUnsignedInt("maxLength", 16);
- int actualChars =
readBuffer.readUnsignedInt("maxLength", 16);
- stringLengths.put(varLengthStringTag, actualChars);
+ // Create an alternative list of request items, where all
var-length string tags are replaced with
+ // fixed-length string tags using the string length returned
by the previous request.
+ LinkedHashMap<String, PlcTag> updatedRequestItems = new
LinkedHashMap<>(request.getNumberOfTags());
+ for (String tagName : request.getTagNames()) {
+ PlcTag tag = request.getTag(tagName);
+ if (tag instanceof S7StringVarLengthTag) {
+ S7StringVarLengthTag varLengthTag =
(S7StringVarLengthTag) tag;
+ int stringLength =
s7StringVarLengthTagStringSizesMap.get(varLengthTag).getCurLength();
+ S7StringFixedLengthTag newTag = new
S7StringFixedLengthTag(varLengthTag.getDataType(), varLengthTag.getMemoryArea(),
+ varLengthTag.getBlockNumber(),
varLengthTag.getByteOffset(), varLengthTag.getBitOffset(),
+ varLengthTag.getNumberOfElements(), stringLength);
+ updatedRequestItems.put(tagName, newTag);
} else {
- throw new PlcInvalidTagException("Only STRING and
WSTRING allowed here.");
+ updatedRequestItems.put(tagName, tag);
}
- } catch (ParseException e) {
- throw new PlcInvalidTagException("Error reading var-length
string actual lengths.");
}
- }
- } catch (InterruptedException ex) {
- logger.info(ex.getMessage());
- Thread.currentThread().interrupt();
- } catch (ExecutionException | TimeoutException ex) {
- logger.info(ex.getMessage());
- }
- // Create an alternative list of request items, where all var-length
string tags are replaced with
- // fixed-length string tags using the string length returned by the
previous request.
- LinkedHashMap<String, PlcTag> updatedRequestItems = new
LinkedHashMap<>(request.getNumberOfTags());
- for (String tagName : request.getTagNames()) {
- PlcTag tag = request.getTag(tagName);
- if(tag instanceof S7StringVarLengthTag) {
- S7StringVarLengthTag varLengthTag = (S7StringVarLengthTag) tag;
- int stringLength = stringLengths.get(varLengthTag);
- S7StringFixedLengthTag newTag = new
S7StringFixedLengthTag(varLengthTag.getDataType(), varLengthTag.getMemoryArea(),
- varLengthTag.getBlockNumber(),
varLengthTag.getByteOffset(), varLengthTag.getBitOffset(),
- varLengthTag.getNumberOfElements(), stringLength);
- updatedRequestItems.put(tagName, newTag);
- } else {
- updatedRequestItems.put(tagName, tag);
+ // Use the normal functionality to execute the read request.
+ // TODO: Here technically the request object in the response
will not match the original request.
+ CompletableFuture<S7Message> s7MessageCompletableFuture =
performOrdinaryReadRequest(new DefaultPlcReadRequest(request.getReader(),
updatedRequestItems));
+ s7MessageCompletableFuture.whenComplete((s7Message,
throwable1) -> {
+ if (throwable1 != null) {
+ future.completeExceptionally(throwable1);
+ } else {
+ future.complete(s7Message);
+ }
+ });
}
- }
+ });
- // Use the normal functionality to execute the read request.
- // TODO: Here technically the request object in the response will not
match the original request.
- return encodePlcReadRequest(new
DefaultPlcReadRequest(request.getReader(), updatedRequestItems));
+ return future;
}
- private S7Message encodePlcReadRequest(DefaultPlcReadRequest request) {
+ private CompletableFuture<S7Message>
performOrdinaryReadRequest(DefaultPlcReadRequest request) {
// Convert each tag in the request into a corresponding item used in
the S7 protocol.
List<S7VarRequestParameterItem> requestItems = new
ArrayList<>(request.getNumberOfTags());
for (PlcTag tag : request.getTags()) {
@@ -1414,13 +1294,18 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
// Create a read request template.
// tpuId will be inserted before sending in #readInternal, so we
insert -1 as dummy here
- return new S7MessageRequest(getTpduId(),
+ S7Message requestMessage = new S7MessageRequest(getTpduId(),
new S7ParameterReadVarRequest(requestItems),
null);
+
+ return sendInternal(requestMessage);
}
/*
+ * Encoding of STRING types ... for WSTRING types the "Maximum length" and
+ * "Current length" are both 16 bit unsigned integer values:
+ *
* +-------------------+
* Byte n | Maximum length | (intMaxChars)
* +-------------------+
@@ -1438,18 +1323,18 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
* +-------------------+ |
* Byte ... | ... | /
* +-------------------+
+ *
* Reading text strings in two steps:
- * 1. For the operation on texts, you must first evaluate the
+ * 1. For the operation on texts, you must first evaluate the
* space created on DB of type STRING or WSTRING.
- * 2. The first two bytes have the maximum number of characters (bytes)
- * available to store text strings (intMaxChars) and the number of
+ * 2. The first two bytes have the maximum number of characters (bytes)
+ * available to store text strings (intMaxChars) and the number of
* characters available (intActualChars).
- * 3. In the specific case of write string, only the max characters
defined
- * by "intMaxChars" are writed.
+ * 3. In the specific case of write string, only the max characters defined
+ * by "intMaxChars" are written.
* TODO: Maximum waiting time managed by system variables.
- */
- private TPKTPacket encodePlcStringWriteRequest(DefaultPlcWriteRequest
request,
- int tpduId) {
+ */
+ private CompletableFuture<S7Message>
performVarLengthStringWriteRequest(DefaultPlcWriteRequest request) {
List<S7VarRequestParameterItem> parameterItems = new
ArrayList<>(request.getNumberOfTags());
List<S7VarPayloadDataItem> payloadItems = new
ArrayList<>(request.getNumberOfTags());
@@ -1457,34 +1342,34 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
int intActualChars = 0;
final S7StringVarLengthTag tag = (S7StringVarLengthTag)
request.getTags().get(0);
-
+
//Read the max length and actual size.
// TODO: Is the tpduId of 1 correct here?
- final S7MessageRequest readRequest = new S7MessageRequest(1,new
S7ParameterReadVarRequest(
- List.of(new S7VarRequestParameterItemAddress(
- new S7AddressAny(
- TransportSize.BYTE,
- 2,
- tag.getBlockNumber(),
- MemoryArea.DATA_BLOCKS,
- tag.getByteOffset(),
- tag.getBitOffset()
- ))
-
- )), null);
-
- CompletableFuture<S7Message> future = readInternal(readRequest);
-
+ final S7MessageRequest readRequest = new S7MessageRequest(1, new
S7ParameterReadVarRequest(
+ List.of(new S7VarRequestParameterItemAddress(
+ new S7AddressAny(
+ TransportSize.BYTE,
+ 2,
+ tag.getBlockNumber(),
+ MemoryArea.DATA_BLOCKS,
+ tag.getByteOffset(),
+ tag.getBitOffset()
+ ))
+
+ )), null);
+
+ CompletableFuture<S7Message> future = sendInternal(readRequest);
+
try {
S7Message get = future.get(2000, TimeUnit.MILLISECONDS);
- final S7VarPayloadDataItem payload =
(S7VarPayloadDataItem)((S7PayloadReadVarResponse)
get.getPayload()).getItems().get(0);
+ final S7VarPayloadDataItem payload = (S7VarPayloadDataItem)
((S7PayloadReadVarResponse) get.getPayload()).getItems().get(0);
intMaxChars = Byte.toUnsignedInt(payload.getData()[0]);
//payload.getData()[0] & 0xFF
- intActualChars = Byte.toUnsignedInt(payload.getData()[1]);
+ intActualChars = Byte.toUnsignedInt(payload.getData()[1]);
} catch (InterruptedException ex) {
logger.info(ex.getMessage());
} catch (ExecutionException ex) {
logger.info(ex.getMessage());
- } catch (TimeoutException ex) {
+ } catch (TimeoutException ex) {
logger.info(ex.getMessage());
}
@@ -1498,49 +1383,42 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
tagName = iter.next();
final S7StringVarLengthTag tagRef = (S7StringVarLengthTag)
request.getTag(tagName);
plcValue = request.getPlcValue(tagName);
-
+
//Check if String
String strValue = plcValue.getString();
if (strValue.length() > intMaxChars) {
strValue = strValue.substring(0, intMaxChars);
plcValue = new PlcSTRING(strValue);
}
-
+
S7Address s7Address = new S7AddressAny(
- tagRef.getDataType().BYTE,
- strValue.length() + 2,
- tagRef.getBlockNumber(),
- tagRef.getMemoryArea(),
- tagRef.getByteOffset(),
- tagRef.getBitOffset());
-
- parameterItems.add(new
S7VarRequestParameterItemAddress(s7Address));
-
+ tagRef.getDataType().BYTE,
+ strValue.length() + 2,
+ tagRef.getBlockNumber(),
+ tagRef.getMemoryArea(),
+ tagRef.getByteOffset(),
+ tagRef.getBitOffset());
+
+ parameterItems.add(new
S7VarRequestParameterItemAddress(s7Address));
+
ByteBuffer byteBuffer = ByteBuffer.allocate(strValue.length() + 2);
byteBuffer.put((byte) intMaxChars);
byteBuffer.put((byte) strValue.length());
byteBuffer.put(strValue.getBytes());
-
+
DataTransportSize transportSize =
DataTransportSize.BYTE_WORD_DWORD;
-
+
payloadItems.add(new
S7VarPayloadDataItem(DataTransportErrorCode.OK, transportSize,
byteBuffer.array()/*, hasNext*/));
}
- return new TPKTPacket(
- new COTPPacketData(
- null,
- new S7MessageRequest(tpduId,
- new S7ParameterWriteVarRequest(parameterItems),
- new S7PayloadWriteVarRequest(payloadItems)
- ),
- true,
- (byte) tpduId
- )
- );
+ return sendInternal(
+ new S7MessageRequest(getTpduId(),
+ new S7ParameterWriteVarRequest(parameterItems),
+ new S7PayloadWriteVarRequest(payloadItems)
+ ));
}
- private TPKTPacket encodeOrdinaryWriteRequest(DefaultPlcWriteRequest
request,
- int tpduId) {
+ private CompletableFuture<S7Message>
performOrdinaryWriteRequest(DefaultPlcWriteRequest request) {
List<S7VarRequestParameterItem> parameterItems = new
ArrayList<>(request.getNumberOfTags());
List<S7VarPayloadDataItem> payloadItems = new
ArrayList<>(request.getNumberOfTags());
@@ -1551,17 +1429,47 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
payloadItems.add(serializePlcValue(tag, plcValue));
}
- return new TPKTPacket(
- new COTPPacketData(
- null,
- new S7MessageRequest(tpduId,
- new S7ParameterWriteVarRequest(parameterItems),
- new S7PayloadWriteVarRequest(payloadItems)
- ),
- true,
- (byte) tpduId
- )
- );
+ return sendInternal(
+ new S7MessageRequest(getTpduId(),
+ new S7ParameterWriteVarRequest(parameterItems),
+ new S7PayloadWriteVarRequest(payloadItems)
+ ));
+ }
+
+ /**
+ * Sends one Read over the Wire and internally returns the Response
+ * Do sending of normally sized single-message request.
+ * <p>
+ * Assumes that the {@link S7MessageRequest} and its expected {@link
S7MessageResponseData}
+ * and does not further check that!
+ */
+ private CompletableFuture<S7Message> sendInternal(S7Message request) {
+ CompletableFuture<S7Message> future = new CompletableFuture<>();
+
+ // Get the tpduId from the S7 message.
+ int tpduId = request.getTpduReference();
+
+ TPKTPacket tpktPacket = new TPKTPacket(new COTPPacketData(null,
request, true, (byte) tpduId));
+
+ // Start a new request-transaction (Is ended in the response-handler)
+ RequestTransactionManager.RequestTransaction transaction =
tm.startRequest();
+ // Send the request.
+ transaction.submit(() -> context.sendRequest(tpktPacket)
+ .onTimeout(new TransactionErrorCallback<>(future, transaction))
+ .onError(new TransactionErrorCallback<>(future, transaction))
+ .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
+ .check(p -> p.getPayload() instanceof COTPPacketData)
+ .unwrap(p -> (COTPPacketData) p.getPayload())
+ .check(p -> p.getPayload() != null)
+ .unwrap(COTPPacket::getPayload)
+ .check(p -> p.getTpduReference() == tpduId)
+ .handle(p -> {
+ future.complete(p);
+ // Finish the request-transaction.
+ transaction.endRequest();
+ }));
+
+ return future;
}
/**
@@ -1783,14 +1691,14 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
//TODO: Reassembling message.
if (responseMessage instanceof S7MessageResponseData) {
- for (String tagName : plcReadRequest.getTagNames()) {
+ for (String tagName : plcReadRequest.getTagNames()) {
if (plcReadRequest.getTag(tagName) instanceof
S7StringVarLengthTag) {
- PlcValue plcValue = null;
+ PlcValue plcValue = null;
PlcResponseCode responseCode =
PlcResponseCode.INTERNAL_ERROR;
ResponseItem<PlcValue> result = new
ResponseItem<>(responseCode, plcValue);
- values.put(tagName, result);
- }
- }
+ values.put(tagName, result);
+ }
+ }
} else if (responseMessage instanceof S7MessageUserData) {
S7PayloadUserData payload = (S7PayloadUserData)
responseMessage.getPayload();
@@ -1894,18 +1802,14 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
dt.getMsec() * 1000000)));
plcValue = new PlcList(plcValues);
}
-
+
ResponseItem<PlcValue> result = new
ResponseItem<>(responseCode, plcValue);
values.put(tagName, result);
index++;
}
-
-
return new DefaultPlcReadResponse(plcReadRequest, values);
-
}
-
// In all other cases all went well.
S7PayloadReadVarResponse payload = (S7PayloadReadVarResponse)
responseMessage.getPayload();
@@ -2152,32 +2056,6 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
s7Tag.getMemoryArea(), s7Tag.getByteOffset(),
s7Tag.getBitOffset());
}
- /*
- * In the case of a reconnection, there may be requests waiting,
- * for which the operation must be terminated by exception and canceled
- * in the transaction manager. If this does not happen,
- * the driver operation can be frozen.
- */
- private void cleanFutures() {
- //TODO: Debe ser ejecutado si la conexion esta levanta.
- activeRequests.forEach((f, p) -> {
- try {
- if (!f.isDone()) {
- logger.info("CF");
- f.cancel(true);
- logger.info("ClientCF");
- p.getRight().completeExceptionally(new
PlcRuntimeException("Disconnected"));
- logger.info("TM");
- p.getLeft().endRequest();
- }
-
- } catch (Exception ex) {
- logger.info(ex.toString());
- }
- });
- activeRequests.clear();
- }
-
private boolean isConnected() {
return context.getChannel().attr(S7HMuxImpl.IS_CONNECTED).get();
//return true;
@@ -2197,7 +2075,7 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
private boolean isFeatureSupported() {
return (s7DriverContext.getControllerType() ==
S7ControllerType.S7_300) ||
- (s7DriverContext.getControllerType() ==
S7ControllerType.S7_400);
+ (s7DriverContext.getControllerType() == S7ControllerType.S7_400);
}
private CompletableFuture<S7MessageUserData> reassembledMessage(short
sequenceNumber, List<PlcValue> plcValues) {
@@ -2230,12 +2108,12 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
*/
private TPKTPacket createSzlReassembledRequest(int tpduId, short
sequenceNumber) {
S7MessageUserData identifyRemoteMessage = new
S7MessageUserData(tpduId, new S7ParameterUserData(List.of(
- new S7ParameterUserDataItemCPUFunctions((short) 0x12, (byte)
0x4, (byte) 0x4, (short) 0x01, sequenceNumber, (short) 0x00, (short) 0x00, 0)
+ new S7ParameterUserDataItemCPUFunctions((short) 0x12, (byte) 0x4,
(byte) 0x4, (short) 0x01, sequenceNumber, (short) 0x00, (short) 0x00, 0)
)), new S7PayloadUserData(List.of(
- new S7PayloadUserDataItemCpuFunctionReadSzlNoDataRequest(
- DataTransportErrorCode.NOT_FOUND,
- DataTransportSize.NULL,
- 0x00)
+ new S7PayloadUserDataItemCpuFunctionReadSzlNoDataRequest(
+ DataTransportErrorCode.NOT_FOUND,
+ DataTransportSize.NULL,
+ 0x00)
)));
COTPPacketData cotpPacketData = new COTPPacketData(null,
identifyRemoteMessage, true, (byte) 2);
return new TPKTPacket(cotpPacketData);
@@ -2268,12 +2146,12 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
//TODO: S7PayloadUserDataItemCpuFunctionReadSzlNoDataRequest to
S7PayloadUserDataItemCpuFunctionAlarmQueryNoDataRequest
private TPKTPacket createAlarmQueryReassembledRequest(int tpduId, short
sequenceNumber) {
S7MessageUserData identifyRemoteMessage = new
S7MessageUserData(tpduId, new S7ParameterUserData(List.of(
- new S7ParameterUserDataItemCPUFunctions((short) 0x12, (byte)
0x4, (byte) 0x4, (short) 0x13, sequenceNumber, (short) 0x00, (short) 0x00, 0)
+ new S7ParameterUserDataItemCPUFunctions((short) 0x12, (byte) 0x4,
(byte) 0x4, (short) 0x13, sequenceNumber, (short) 0x00, (short) 0x00, 0)
)), new S7PayloadUserData(List.of(
- new S7PayloadUserDataItemCpuFunctionReadSzlNoDataRequest(
- DataTransportErrorCode.NOT_FOUND,
- DataTransportSize.NULL,
- 0x00)
+ new S7PayloadUserDataItemCpuFunctionReadSzlNoDataRequest(
+ DataTransportErrorCode.NOT_FOUND,
+ DataTransportSize.NULL,
+ 0x00)
)));
COTPPacketData cotpPacketData = new COTPPacketData(null,
identifyRemoteMessage, true, (byte) 2);
return new TPKTPacket(cotpPacketData);
@@ -2323,4 +2201,101 @@ public class S7ProtocolLogic extends
Plc4xProtocolBase<TPKTPacket> {
}
}
+ protected CompletableFuture<Map<S7StringVarLengthTag, StringSizes>>
getStringSizes(PlcTagRequest request) {
+ CompletableFuture<Map<S7StringVarLengthTag, StringSizes>> future = new
CompletableFuture<>();
+
+ // Build a request to read the length information for every var-length
string in the request.
+ List<S7StringVarLengthTag> varLengthStringTags =
request.getTags().stream()
+ .filter(plcTag -> plcTag instanceof S7StringVarLengthTag)
+ .map(plcTag -> (S7StringVarLengthTag) plcTag)
+ .collect(Collectors.toList());
+ List<S7VarRequestParameterItem> stringFields = new
ArrayList<>(varLengthStringTags.size());
+ for (S7StringVarLengthTag varLengthStringTag : varLengthStringTags) {
+ // For STRING, the header is 2 bytes (first byte contains the max
length and the second the actual length)
+ if (varLengthStringTag.getDataType() == TransportSize.STRING) {
+ stringFields.add(new S7VarRequestParameterItemAddress(
+ new S7AddressAny(
+ TransportSize.BYTE,
+ 2,
+ varLengthStringTag.getBlockNumber(),
+ MemoryArea.DATA_BLOCKS,
+ varLengthStringTag.getByteOffset(),
+ varLengthStringTag.getBitOffset()
+ )));
+ }
+ // For WSTRING, the header is 4 bytes (first word contains the max
length and the second the actual length)
+ else if (varLengthStringTag.getDataType() ==
TransportSize.WSTRING) {
+ stringFields.add(new S7VarRequestParameterItemAddress(
+ new S7AddressAny(
+ TransportSize.BYTE,
+ 4,
+ varLengthStringTag.getBlockNumber(),
+ MemoryArea.DATA_BLOCKS,
+ varLengthStringTag.getByteOffset(),
+ varLengthStringTag.getBitOffset()
+ )));
+ } else {
+ throw new PlcInvalidTagException("Only STRING and WSTRING
allowed here.");
+ }
+ }
+ final S7MessageRequest readRequest = new S7MessageRequest(
+ getTpduId(), new S7ParameterReadVarRequest(stringFields), null);
+
+ // Read the max length and actual size for each of the var-length
strings.
+ CompletableFuture<S7Message> resolveSizesRequestFuture =
sendInternal(readRequest);
+ resolveSizesRequestFuture.whenComplete((s7Message, throwable) -> {
+ if (throwable != null) {
+ future.completeExceptionally(new PlcProtocolException("Error
resolving string sizes", throwable));
+ return;
+ }
+
+ Map<S7StringVarLengthTag, StringSizes> stringLengths = new
HashMap<>(varLengthStringTags.size());
+ S7PayloadReadVarResponse getLengthsResponse =
(S7PayloadReadVarResponse) s7Message.getPayload();
+ int curItemIndex = 0;
+ for (S7StringVarLengthTag varLengthStringTag :
varLengthStringTags) {
+ S7VarPayloadDataItem s7VarPayloadDataItem =
getLengthsResponse.getItems().get(curItemIndex);
+ ReadBufferByteBased readBuffer = new
ReadBufferByteBased(s7VarPayloadDataItem.getData());
+ try {
+ if (varLengthStringTag.getDataType() ==
TransportSize.STRING) {
+ int maxChars = readBuffer.readUnsignedInt("maxLength",
8);
+ int actualChars =
readBuffer.readUnsignedInt("maxLength", 8);
+ stringLengths.put(varLengthStringTag, new
StringSizes(maxChars, actualChars));
+ } else if (varLengthStringTag.getDataType() ==
TransportSize.WSTRING) {
+ int maxChars = readBuffer.readUnsignedInt("maxLength",
16);
+ int actualChars =
readBuffer.readUnsignedInt("maxLength", 16);
+ stringLengths.put(varLengthStringTag, new
StringSizes(maxChars, actualChars));
+ } else {
+ throw new PlcInvalidTagException("Only STRING and
WSTRING allowed here.");
+ }
+ } catch (ParseException e) {
+ throw new PlcInvalidTagException("Error reading var-length
string actual lengths.");
+ }
+ }
+
+ future.complete(stringLengths);
+ });
+
+ return future;
+ }
+
+ public static class StringSizes {
+
+ private final int maxLength;
+ private final int curLength;
+
+ public StringSizes(int maxLength, int curLength) {
+ this.maxLength = maxLength;
+ this.curLength = curLength;
+ }
+
+ public int getMaxLength() {
+ return maxLength;
+ }
+
+ public int getCurLength() {
+ return curLength;
+ }
+
+ }
+
}