This is an automated email from the ASF dual-hosted git repository. cgarcia pushed a commit to branch fix/s7async in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/fix/s7async by this push: new be8966a417 Modified the cyclical subscription system. TODO time base management fpr CYC. be8966a417 is described below commit be8966a4170e2e5adb5f10ab9903478830f1bdf8 Author: Cesar Garcia <cesar.gar...@ceos.com.ve> AuthorDate: Thu Mar 14 12:58:33 2024 -0400 Modified the cyclical subscription system. TODO time base management fpr CYC. --- .../s7/readwrite/protocol/S7HPlcConnection.java | 12 + .../s7/readwrite/protocol/S7ProtocolLogic.java | 13 +- .../readwrite/utils/S7PlcSubscriptionRequest.java | 275 +++++++++++++++++++++ 3 files changed, 294 insertions(+), 6 deletions(-) diff --git a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HPlcConnection.java b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HPlcConnection.java index fbe8538a61..468ec8e72a 100644 --- a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HPlcConnection.java +++ b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HPlcConnection.java @@ -46,6 +46,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.*; +import org.apache.plc4x.java.api.exceptions.PlcUnsupportedOperationException; +import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest; +import org.apache.plc4x.java.s7.readwrite.utils.S7PlcSubscriptionRequest; +import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionRequest; /** * This object generates the main connection and includes the management @@ -417,4 +421,12 @@ public class S7HPlcConnection extends DefaultNettyPlcConnection implements Runna return null; } + @Override + public PlcSubscriptionRequest.Builder subscriptionRequestBuilder() { + if (!isSubscribeSupported()) { + throw new PlcUnsupportedOperationException("The connection does not support subscription"); + } + return new S7PlcSubscriptionRequest.Builder(this, getPlcTagHandler()); + } + } diff --git a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java index ee64e9b0ec..e758d2fb85 100644 --- a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java +++ b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java @@ -67,6 +67,7 @@ import org.apache.plc4x.java.s7.events.S7AlarmEvent; import org.apache.plc4x.java.s7.events.S7ModeEvent; import org.apache.plc4x.java.s7.events.S7SysEvent; import org.apache.plc4x.java.s7.events.S7UserEvent; +import org.apache.plc4x.java.s7.readwrite.utils.S7PlcSubscriptionRequest; /** * The S7 Protocol states that there can not be more then {min(maxAmqCaller, maxAmqCallee} "ongoing" requests. @@ -397,7 +398,7 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> { futures.put("DATA_", new CompletableFuture<>()); - DefaultPlcSubscriptionRequest request = (DefaultPlcSubscriptionRequest) subscriptionRequest; + S7PlcSubscriptionRequest request = (S7PlcSubscriptionRequest) subscriptionRequest; int tpduId = getTpduId(); @@ -541,7 +542,7 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> { return future; } - private S7Message encodeEventSubscriptionRequest(DefaultPlcSubscriptionRequest request, int tpduId) { + private S7Message encodeEventSubscriptionRequest(S7PlcSubscriptionRequest request, int tpduId) { List<S7ParameterUserDataItem> parameterItems = new ArrayList<>(request.getNumberOfTags()); List<S7PayloadUserDataItem> payloadItems = new ArrayList<>(request.getNumberOfTags()); @@ -924,7 +925,7 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> { new S7PayloadUserData(payloadItems))); } - private S7Message encodeAlarmQueryRequest(DefaultPlcSubscriptionRequest request, int tpduId) { + private S7Message encodeAlarmQueryRequest(S7PlcSubscriptionRequest request, int tpduId) { List<S7ParameterUserDataItem> parameterItems = new ArrayList<>(request.getNumberOfTags()); List<S7PayloadUserDataItem> payloadItems = new ArrayList<>(request.getNumberOfTags()); @@ -956,11 +957,11 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> { new S7PayloadUserData(payloadItems)); } - private void encodeCycledSubscriptionRequest(DefaultPlcSubscriptionRequest request, int tpduId) { + private void encodeCycledSubscriptionRequest(S7PlcSubscriptionRequest request, int tpduId) { } - private S7Message encodeCycledS7ANYSubscriptionRequest(DefaultPlcSubscriptionRequest request, int tpduId) { + private S7Message encodeCycledS7ANYSubscriptionRequest(S7PlcSubscriptionRequest request, int tpduId) { List<S7ParameterUserDataItem> parameterItems = new ArrayList<>(request.getNumberOfTags()); List<S7PayloadUserDataItem> payloadItems = new ArrayList<>(request.getNumberOfTags()); @@ -1040,7 +1041,7 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> { } - private S7Message encodeCycledDBREADSubscriptionRequest(DefaultPlcSubscriptionRequest request, int tpduId) { + private S7Message encodeCycledDBREADSubscriptionRequest(S7PlcSubscriptionRequest request, int tpduId) { List<S7ParameterUserDataItem> parameterItems = new ArrayList<>(request.getNumberOfTags()); List<S7PayloadUserDataItem> payloadItems = new ArrayList<>(request.getNumberOfTags()); diff --git a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/utils/S7PlcSubscriptionRequest.java b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/utils/S7PlcSubscriptionRequest.java new file mode 100644 index 0000000000..16d0f59096 --- /dev/null +++ b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/utils/S7PlcSubscriptionRequest.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.plc4x.java.s7.readwrite.utils; + +import org.apache.plc4x.java.api.exceptions.PlcRuntimeException; +import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent; +import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest; +import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse; +import org.apache.plc4x.java.api.model.PlcSubscriptionTag; +import org.apache.plc4x.java.api.model.PlcTag; +import org.apache.plc4x.java.api.types.PlcSubscriptionType; +import org.apache.plc4x.java.spi.connection.PlcTagHandler; +import org.apache.plc4x.java.spi.generation.SerializationException; +import org.apache.plc4x.java.spi.generation.WriteBuffer; +import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionTag; +import org.apache.plc4x.java.spi.utils.Serializable; + +import java.time.Duration; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Supplier; +import org.apache.plc4x.java.s7.readwrite.TimeBase; +import static org.apache.plc4x.java.s7.readwrite.TimeBase.B01SEC; +import org.apache.plc4x.java.s7.readwrite.tag.S7SubscriptionTag; +import org.apache.plc4x.java.s7.readwrite.tag.S7Tag; +import org.apache.plc4x.java.s7.readwrite.types.S7SubscriptionType; +import org.apache.plc4x.java.spi.messages.PlcSubscriber; + +public class S7PlcSubscriptionRequest implements PlcSubscriptionRequest, Serializable { + + private final PlcSubscriber subscriber; + + private final LinkedHashMap<String, PlcSubscriptionTag> tags; + + private final LinkedHashMap<String, List<Consumer<PlcSubscriptionEvent>>> preRegisteredConsumers; + + public S7PlcSubscriptionRequest(PlcSubscriber subscriber, + LinkedHashMap<String, PlcSubscriptionTag> tags, + LinkedHashMap<String, List<Consumer<PlcSubscriptionEvent>>> preRegisteredConsumers) { + this.subscriber = subscriber; + this.tags = tags; + this.preRegisteredConsumers = preRegisteredConsumers; + } + + @Override + public CompletableFuture<PlcSubscriptionResponse> execute() { + return subscriber.subscribe(this); + } + + @Override + public int getNumberOfTags() { + return tags.size(); + } + + @Override + public LinkedHashSet<String> getTagNames() { + return new LinkedHashSet<>(tags.keySet()); + } + + @Override + public PlcSubscriptionTag getTag(String name) { + return tags.get(name); + } + + @Override + public List<PlcSubscriptionTag> getTags() { + return new ArrayList<>(tags.values()); + } + + @Override + public Map<String, List<Consumer<PlcSubscriptionEvent>>> getPreRegisteredConsumers() { + return new LinkedHashMap<>(preRegisteredConsumers); + } + + @Override + public void serialize(WriteBuffer writeBuffer) throws SerializationException { + writeBuffer.pushContext("PlcSubscriptionRequest"); + + writeBuffer.pushContext("tags"); + for (Map.Entry<String, PlcSubscriptionTag> tagEntry : tags.entrySet()) { + String tagName = tagEntry.getKey(); + writeBuffer.pushContext(tagName); + PlcTag tag = tagEntry.getValue(); + if (!(tag instanceof Serializable)) { + throw new RuntimeException("Error serializing. Tag doesn't implement XmlSerializable"); + } + ((Serializable) tag).serialize(writeBuffer); + writeBuffer.popContext(tagName); + } + writeBuffer.popContext("tags"); + + writeBuffer.popContext("PlcSubscriptionRequest"); + } + + public static class Builder implements PlcSubscriptionRequest.Builder { + + private final PlcSubscriber subscriber; + private final PlcTagHandler tagHandler; + private final Map<String, BuilderItem> tags; + private final LinkedHashMap<String, List<Consumer<PlcSubscriptionEvent>>> preRegisteredConsumers; + + public Builder(PlcSubscriber subscriber, PlcTagHandler tagHandler) { + this.subscriber = subscriber; + this.tagHandler = tagHandler; + this.tags = new TreeMap<>(); + this.preRegisteredConsumers = new LinkedHashMap<>(); + } + + @Override + public PlcSubscriptionRequest.Builder addCyclicTagAddress(String name, String tagAddress, Duration pollingInterval) { + if (tags.containsKey(name)) { + throw new PlcRuntimeException("Duplicate tag definition '" + name + "'"); + } + TimeBase tb = getTimeBase(pollingInterval); + short multiplier = getMultiplier(tb, pollingInterval); + S7Tag[] s7tags = new S7Tag[]{S7Tag.of(tagAddress)}; + S7SubscriptionTag tag = new S7SubscriptionTag(S7SubscriptionType.CYCLIC_SUBSCRIPTION, s7tags, tb, multiplier); + tags.put(name, new BuilderItem(() -> tag, PlcSubscriptionType.CYCLIC, pollingInterval)); + return this; + } + + @Override + public PlcSubscriptionRequest.Builder addCyclicTag(String name, PlcTag tag, Duration pollingInterval) { + if (tags.containsKey(name)) { + throw new PlcRuntimeException("Duplicate tag definition '" + name + "'"); + } + if ((tag instanceof S7SubscriptionTag) == false){ + throw new PlcRuntimeException("Tag is not of type S7SubcriptionTag"); + } + tags.put(name, new BuilderItem(() -> tag, PlcSubscriptionType.CYCLIC, pollingInterval)); + return this; + } + + @Override + public PlcSubscriptionRequest.Builder addChangeOfStateTagAddress(String name, String tagAddress) { + throw new PlcRuntimeException("Feature currently not supported."); + } + + @Override + public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, PlcTag tag) { + throw new PlcRuntimeException("Feature currently not supported."); + } + + @Override + public PlcSubscriptionRequest.Builder addEventTagAddress(String name, String tagAddress) { + if (tags.containsKey(name)) { + throw new PlcRuntimeException("Duplicate tag definition '" + name + "'"); + } + PlcTag tag = tagHandler.parseTag(tagAddress); + if ((tag instanceof S7SubscriptionTag) == false){ + throw new PlcRuntimeException("Tag address is not of type S7SubcriptionTag"); + } + tags.put(name, new BuilderItem(() -> tagHandler.parseTag(tagAddress), PlcSubscriptionType.EVENT)); + return this; + } + + @Override + public PlcSubscriptionRequest.Builder addEventTag(String name, PlcTag tag) { + if (tags.containsKey(name)) { + throw new PlcRuntimeException("Duplicate tag definition '" + name + "'"); + } + if ((tag instanceof S7SubscriptionTag) == false){ + throw new PlcRuntimeException("Tag is not of type S7SubcriptionTag"); + } + tags.put(name, new BuilderItem(() -> tag, PlcSubscriptionType.EVENT)); + return this; + } + + @Override + public PlcSubscriptionRequest.Builder addPreRegisteredConsumer(String name, Consumer<PlcSubscriptionEvent> consumer) { + preRegisteredConsumers.putIfAbsent(name, new LinkedList<>()); + preRegisteredConsumers.get(name).add(consumer); + return this; + } + + @Override + public PlcSubscriptionRequest build() { + LinkedHashMap<String, PlcSubscriptionTag> parsedTags = new LinkedHashMap<>(); + + tags.forEach((name, builderItem) -> { + PlcTag parsedTag = builderItem.tag.get(); + parsedTags.put(name, new DefaultPlcSubscriptionTag(builderItem.plcSubscriptionType, parsedTag, builderItem.duration)); + }); + preRegisteredConsumers.forEach((tagName, ignored) -> { + if (!tags.containsKey(tagName)) { + throw new RuntimeException("tagName " + tagName + "for preRegisteredConsumer not found"); + } + }); + return new S7PlcSubscriptionRequest(subscriber, parsedTags, preRegisteredConsumers); + } + + private static class BuilderItem { + private final Supplier<PlcTag> tag; + private final PlcSubscriptionType plcSubscriptionType; + private final Duration duration; + + private BuilderItem(Supplier<PlcTag> tag, PlcSubscriptionType plcSubscriptionType) { + this(tag, plcSubscriptionType, null); + } + + private BuilderItem(Supplier<PlcTag> tag, PlcSubscriptionType plcSubscriptionType, Duration duration) { + this.tag = tag; + this.plcSubscriptionType = plcSubscriptionType; + this.duration = duration; + } + + } + + private TimeBase getTimeBase(Duration duration) { + if (duration.equals(Duration.ZERO)) { + throw new PlcRuntimeException("Subscription time cannot be zero."); + } + long millis = duration.toMillis(); + if (millis <= 25500) { + return TimeBase.B01SEC; + } if (millis <= 255000) { + return TimeBase.B1SEC; + } if (millis <= 2550000) { + return TimeBase.B10SEC; + } + + throw new PlcRuntimeException("The maximum subscription time of 2550 sec."); + } + + //TODO: Chek multiplier is 1-99 in BCD?? + private short getMultiplier(TimeBase tbase, Duration duration) { + short multiplier = 1; + if (duration.equals(Duration.ZERO)) { + throw new PlcRuntimeException("Subscription time cannot be zero."); + } + long millis = duration.toMillis(); + switch(tbase) { + case B01SEC:; + if (millis > 100) { + multiplier = (short) (millis / 100); + } + break; + case B1SEC:; + multiplier = (short) (millis / 1000); + break; + case B10SEC:; + multiplier = (short) (millis / 10000); + break; + + } + return multiplier; + } + + } + + @Override + public String toString() { + return "DefaultPlcSubscriptionRequest{" + + "subscriber=" + subscriber + + ", tags=" + tags + + '}'; + } +}