This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new af9008c1e8 (feat): Add a minimum polling interval for CHANGE_ON_EVENT
subscription type (#2336)
af9008c1e8 is described below
commit af9008c1e81a8c44b7b36753279740aaadda974e
Author: Kirill Kostin <[email protected]>
AuthorDate: Tue Nov 11 10:35:46 2025 +0100
(feat): Add a minimum polling interval for CHANGE_ON_EVENT subscription
type (#2336)
* (feat): Add a minimum polling interval for CHANGE_ON_EVENT subscription
type
---
.../java/api/messages/PlcSubscriptionRequest.java | 48 ++++++++++++++++++++-
.../readwrite/utils/S7PlcSubscriptionRequest.java | 49 ++++++++++++----------
.../messages/DefaultPlcSubscriptionRequest.java | 33 +++++++++++----
.../java/utils/cache/LeasedPlcConnection.java | 20 +++++++++
4 files changed, 121 insertions(+), 29 deletions(-)
diff --git
a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
index 862286c257..f1a0fe23bd 100644
---
a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
+++
b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
@@ -67,7 +67,7 @@ public interface PlcSubscriptionRequest extends
PlcSubscriptionTagRequest {
* Adds a new tag to the to be constructed request which should be
polled cyclically.
*
* @param name alias of the tag.
- * @param tag tag instance for accessing the tag.
+ * @param tag tag instance for accessing the tag.
* @param pollingInterval interval, in which the tag should be polled.
* @return builder.
*/
@@ -105,6 +105,29 @@ public interface PlcSubscriptionRequest extends
PlcSubscriptionTagRequest {
*/
PlcSubscriptionRequest.Builder addChangeOfStateTagAddress(String name,
String tagAddress, Consumer<PlcSubscriptionEvent> consumer);
+ /**
+ * Adds a new tag to the to be constructed request which should be
updated as soon as
+ * a value changes in the PLC.
+ *
+ * @param name alias of the tag.
+ * @param tagAddress tag address string for accessing the tag.
+ * @param minInterval min interval for updates (Limits the number of
events for high frequency changes).
+ * @return builder.
+ */
+ PlcSubscriptionRequest.Builder addChangeOfStateTagAddress(String name,
String tagAddress, Duration minInterval);
+
+ /**
+ * Adds a new tag to the to be constructed request which should be
updated as soon as
+ * a value changes in the PLC.
+ *
+ * @param name alias of the tag.
+ * @param tagAddress tag address string for accessing the tag.
+ * @param consumer consumer for receiving update events for a given
tag only.
+ * @param minInterval min interval for updates (Limits the number of
events for high frequency changes).
+ * @return builder.
+ */
+ PlcSubscriptionRequest.Builder addChangeOfStateTagAddress(String name,
String tagAddress, Consumer<PlcSubscriptionEvent> consumer, Duration
minInterval);
+
/**
* Adds a new tag to the to be constructed request which should be
updated as soon as
* a value changes in the PLC.
@@ -126,6 +149,29 @@ public interface PlcSubscriptionRequest extends
PlcSubscriptionTagRequest {
*/
PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, PlcTag
tag, Consumer<PlcSubscriptionEvent> consumer);
+ /**
+ * Adds a new tag to the to be constructed request which should be
updated as soon as
+ * a value changes in the PLC.
+ *
+ * @param name alias of the tag.
+ * @param tag tag instance for accessing the tag.
+ * @param minInterval min interval for updates (Limits the number of
events for high frequency changes).
+ * @return builder.
+ */
+ PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, PlcTag
tag, Duration minInterval);
+
+ /**
+ * Adds a new tag to the to be constructed request which should be
updated as soon as
+ * a value changes in the PLC.
+ *
+ * @param name alias of the tag.
+ * @param tag tag instance for accessing the tag.
+ * @param consumer consumer for receiving update events for a given
tag only.
+ * @param minInterval min interval for updates (Limits the number of
events for high frequency changes).
+ * @return builder.
+ */
+ PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, PlcTag
tag, Consumer<PlcSubscriptionEvent> consumer, Duration minInterval);
+
/**
* Adds a new subscription to the to be constructed request which
should be updated
* as soon as an event occurs.
diff --git
a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/utils/S7PlcSubscriptionRequest.java
b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/utils/S7PlcSubscriptionRequest.java
index f317cf2a49..d0f58ea1b2 100644
---
a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/utils/S7PlcSubscriptionRequest.java
+++
b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/utils/S7PlcSubscriptionRequest.java
@@ -21,25 +21,18 @@ package org.apache.plc4x.java.s7.readwrite.utils;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
-import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
import org.apache.plc4x.java.api.model.PlcSubscriptionTag;
import org.apache.plc4x.java.api.model.PlcTag;
-import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.api.types.PlcSubscriptionType;
import org.apache.plc4x.java.spi.connection.PlcTagHandler;
-import org.apache.plc4x.java.spi.generation.SerializationException;
-import org.apache.plc4x.java.spi.generation.WriteBuffer;
import org.apache.plc4x.java.spi.messages.utils.DefaultPlcTagItem;
import org.apache.plc4x.java.spi.messages.utils.PlcTagItem;
import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionTag;
-import org.apache.plc4x.java.spi.utils.Serializable;
import java.time.Duration;
import java.util.*;
-import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Supplier;
-import java.util.stream.Collectors;
import org.apache.plc4x.java.s7.readwrite.TimeBase;
import org.apache.plc4x.java.s7.readwrite.tag.S7SubscriptionTag;
@@ -225,40 +218,54 @@ public class S7PlcSubscriptionRequest extends
DefaultPlcSubscriptionRequest {
@Override
public PlcSubscriptionRequest.Builder
addChangeOfStateTagAddress(String name, String tagAddress) {
- return addChangeOfStateTagAddress(tagAddress, name);
+ return addChangeOfStateTagAddress(name, tagAddress, null, null);
+ }
+
+ @Override
+ public PlcSubscriptionRequest.Builder
addChangeOfStateTagAddress(String name, String tagAddress, Duration
minInterval) {
+ return addChangeOfStateTagAddress(name, tagAddress, null,
minInterval);
}
- /*
- *
- */
@Override
public PlcSubscriptionRequest.Builder
addChangeOfStateTagAddress(String name, String tagAddress,
Consumer<PlcSubscriptionEvent> consumer) {
+ return addChangeOfStateTagAddress(name, tagAddress, consumer,
null);
+ }
+
+ @Override
+ public PlcSubscriptionRequest.Builder
addChangeOfStateTagAddress(String name, String tagAddress,
Consumer<PlcSubscriptionEvent> consumer, Duration minInterval) {
if (tags.containsKey(name)) {
throw new PlcRuntimeException(CONST_DUPLICATE_TAG + " '" +
name + "'");
}
- S7Tag[] s7tags = new S7Tag[]{S7Tag.of(tagAddress)};
- S7SubscriptionTag tag = new
S7SubscriptionTag(S7SubscriptionType.CYCLIC_SUBSCRIPTION, s7tags,
TimeBase.B01SEC, (short) 1);
- tags.put(name, new BuilderItem(() -> tag,
PlcSubscriptionType.CHANGE_OF_STATE, consumer));
+ S7Tag[] s7tags = new S7Tag[]{S7Tag.of(tagAddress)};
+ S7SubscriptionTag tag = new
S7SubscriptionTag(S7SubscriptionType.CYCLIC_SUBSCRIPTION, s7tags,
TimeBase.B01SEC, (short) 1);
+ tags.put(name, new BuilderItem(() -> tag,
PlcSubscriptionType.CHANGE_OF_STATE, minInterval, consumer));
return this;
}
@Override
public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name,
PlcTag tag) {
- return addChangeOfStateTag(name, tag, null);
+ return addChangeOfStateTag(name, tag, null, null);
+ }
+
+ @Override
+ public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name,
PlcTag tag, Duration minInterval) {
+ return addChangeOfStateTag(name, tag, null, minInterval);
}
- /*
- *
- */
@Override
public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name,
PlcTag tag, Consumer<PlcSubscriptionEvent> consumer) {
+ return addChangeOfStateTag(name, tag, consumer, null);
+ }
+
+ @Override
+ public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name,
PlcTag tag, Consumer<PlcSubscriptionEvent> consumer, Duration minInterval) {
if (tags.containsKey(name)) {
throw new PlcRuntimeException(CONST_DUPLICATE_TAG + " '" +
name + "'");
}
- if (!(tag instanceof S7SubscriptionTag)){
+ if (!(tag instanceof S7SubscriptionTag)) {
throw new PlcRuntimeException(CONST_INVALID_TYPE);
- }
- tags.put(name, new BuilderItem(() -> tag,
PlcSubscriptionType.CHANGE_OF_STATE, consumer));
+ }
+ tags.put(name, new BuilderItem(() -> tag,
PlcSubscriptionType.CHANGE_OF_STATE, minInterval, consumer));
return this;
}
diff --git
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/messages/DefaultPlcSubscriptionRequest.java
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/messages/DefaultPlcSubscriptionRequest.java
index 1e1e8a98aa..f8506fc64f 100644
---
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/messages/DefaultPlcSubscriptionRequest.java
+++
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/messages/DefaultPlcSubscriptionRequest.java
@@ -179,31 +179,50 @@ public class DefaultPlcSubscriptionRequest implements
PlcSubscriptionRequest, Se
@Override
public PlcSubscriptionRequest.Builder
addChangeOfStateTagAddress(String name, String tagAddress) {
- addChangeOfStateTagAddress(name, tagAddress, null);
- return this;
+ return addChangeOfStateTagAddress(name, tagAddress, null, null);
+ }
+
+ @Override
+ public PlcSubscriptionRequest.Builder
addChangeOfStateTagAddress(String name, String tagAddress, Duration
minInterval) {
+ return addChangeOfStateTagAddress(name, tagAddress, null,
minInterval);
}
@Override
public PlcSubscriptionRequest.Builder
addChangeOfStateTagAddress(String name, String tagAddress,
Consumer<PlcSubscriptionEvent> consumer) {
+ return addChangeOfStateTagAddress(name, tagAddress, consumer,
null);
+ }
+
+ @Override
+ public PlcSubscriptionRequest.Builder
addChangeOfStateTagAddress(String name, String tagAddress,
Consumer<PlcSubscriptionEvent> consumer, Duration minInterval) {
if (tags.containsKey(name)) {
throw new PlcRuntimeException("Duplicate tag definition '" +
name + "'");
}
- tags.put(name, new BuilderItem(() ->
tagHandler.parseTag(tagAddress), PlcSubscriptionType.CHANGE_OF_STATE,
consumer));
- return null;
+ tags.put(name, new BuilderItem(() ->
tagHandler.parseTag(tagAddress), PlcSubscriptionType.CHANGE_OF_STATE,
minInterval, consumer));
+ return this;
}
@Override
public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name,
PlcTag tag) {
- addChangeOfStateTag(name, tag, null);
- return this;
+ return addChangeOfStateTag(name, tag, null, null);
+ }
+
+ @Override
+ public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name,
PlcTag tag, Duration minInterval) {
+ return addChangeOfStateTag(name, tag, null, minInterval);
}
@Override
public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name,
PlcTag tag, Consumer<PlcSubscriptionEvent> consumer) {
+ return addChangeOfStateTag(name, tag, consumer, null);
+ }
+
+ @Override
+ public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name,
PlcTag tag, Consumer<PlcSubscriptionEvent> consumer,
+ Duration
minInterval) {
if (tags.containsKey(name)) {
throw new PlcRuntimeException("Duplicate tag definition '" +
name + "'");
}
- tags.put(name, new BuilderItem(() -> tag,
PlcSubscriptionType.CHANGE_OF_STATE, consumer));
+ tags.put(name, new BuilderItem(() -> tag,
PlcSubscriptionType.CHANGE_OF_STATE, minInterval, consumer));
return this;
}
diff --git
a/plc4j/tools/connection-cache/src/main/java/org/apache/plc4x/java/utils/cache/LeasedPlcConnection.java
b/plc4j/tools/connection-cache/src/main/java/org/apache/plc4x/java/utils/cache/LeasedPlcConnection.java
index 3b8b46ce35..2f17ee5371 100644
---
a/plc4j/tools/connection-cache/src/main/java/org/apache/plc4x/java/utils/cache/LeasedPlcConnection.java
+++
b/plc4j/tools/connection-cache/src/main/java/org/apache/plc4x/java/utils/cache/LeasedPlcConnection.java
@@ -394,6 +394,16 @@ public class LeasedPlcConnection implements
EventPlcConnection {
return innerBuilder.addChangeOfStateTagAddress(name,
tagAddress, consumer);
}
+ @Override
+ public PlcSubscriptionRequest.Builder
addChangeOfStateTagAddress(String name, String tagAddress, Duration
minInterval) {
+ return innerBuilder.addChangeOfStateTagAddress(name,
tagAddress, minInterval);
+ }
+
+ @Override
+ public PlcSubscriptionRequest.Builder
addChangeOfStateTagAddress(String name, String tagAddress,
Consumer<PlcSubscriptionEvent> consumer, Duration minInterval) {
+ return innerBuilder.addChangeOfStateTagAddress(name,
tagAddress, consumer, minInterval);
+ }
+
@Override
public PlcSubscriptionRequest.Builder addChangeOfStateTag(String
name, PlcTag tag) {
return innerBuilder.addChangeOfStateTag(name, tag);
@@ -404,6 +414,16 @@ public class LeasedPlcConnection implements
EventPlcConnection {
return innerBuilder.addChangeOfStateTag(name, tag, consumer);
}
+ @Override
+ public PlcSubscriptionRequest.Builder addChangeOfStateTag(String
name, PlcTag tag, Duration minInterval) {
+ return innerBuilder.addChangeOfStateTag(name, tag,
minInterval);
+ }
+
+ @Override
+ public PlcSubscriptionRequest.Builder addChangeOfStateTag(String
name, PlcTag tag, Consumer<PlcSubscriptionEvent> consumer, Duration
minInterval) {
+ return innerBuilder.addChangeOfStateTag(name, tag, consumer,
minInterval);
+ }
+
@Override
public PlcSubscriptionRequest.Builder addEventTagAddress(String
name, String tagAddress) {
return innerBuilder.addEventTagAddress(name, tagAddress);