This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 6073d4bf47 feat(plc4j/spi): Add option to synchronously await response
from PLC (#1163)
6073d4bf47 is described below
commit 6073d4bf4755c31600e858745bac45a12e937d6f
Author: Rajmund Takács <[email protected]>
AuthorDate: Wed Oct 25 09:09:15 2023 +0200
feat(plc4j/spi): Add option to synchronously await response from PLC (#1163)
There might be cases when the driver needs to wait until the device
responds our last request, before sending the next request. This
feature attempts to make this more convenient.
---
.../apache/plc4x/java/spi/ConversationContext.java | 3 +
.../apache/plc4x/java/spi/Plc4xNettyWrapper.java | 19 ++++--
.../java/spi/internal/DefaultContextHandler.java | 17 +++--
.../spi/internal/DefaultExpectRequestContext.java | 2 +-
.../spi/internal/DefaultSendRequestContext.java | 2 +-
.../java/spi/internal/HandlerRegistration.java | 75 ++++++++++++++++++++--
.../plc4x/java/spi/Plc4xNettyWrapperTest.java | 3 +-
7 files changed, 99 insertions(+), 22 deletions(-)
diff --git
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/ConversationContext.java
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/ConversationContext.java
index 94b09a282b..72508b5484 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/ConversationContext.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/ConversationContext.java
@@ -19,6 +19,7 @@
package org.apache.plc4x.java.spi;
import io.netty.channel.Channel;
+import java.util.concurrent.ExecutionException;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.spi.configuration.Configuration;
@@ -92,6 +93,8 @@ public interface ConversationContext<T> {
void cancel();
+ void awaitResponse() throws InterruptedException, ExecutionException;
+
}
}
diff --git
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
index 27abd9324d..e1f708072d 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
@@ -152,6 +152,11 @@ public class Plc4xNettyWrapper<T> extends
MessageToMessageCodec<T, Object> {
continue;
}
// Timeout?
+ if (registration.isDone()) {
+ logger.debug("Removing {} as it's already done. Timed out?",
registration);
+ iter.remove();
+ continue;
+ }
logger.trace("Checking handler {} for Object of type {}",
registration, t.getClass().getSimpleName());
if (registration.getExpectClazz().isInstance(t)) {
logger.trace("Handler {} has right expected type {}, checking
condition", registration, registration.getExpectClazz().getSimpleName());
@@ -231,6 +236,9 @@ public class Plc4xNettyWrapper<T> extends
MessageToMessageCodec<T, Object> {
completionCallback.andThen(handler.getPacketConsumer()),
handler.getOnTimeoutConsumer(),
handler.getErrorConsumer(),
+ handler::confirmHandled,
+ handler::confirmError,
+ handler::cancel,
handler.getTimeout()
);
deferred.set(registration);
@@ -238,12 +246,11 @@ public class Plc4xNettyWrapper<T> extends
MessageToMessageCodec<T, Object> {
}
private Consumer<TimeoutException>
onTimeout(AtomicReference<HandlerRegistration> reference,
Consumer<TimeoutException> onTimeoutConsumer) {
- return new Consumer<TimeoutException>() {
- @Override
- public void accept(TimeoutException e) {
- registeredHandlers.remove(reference.get());
- onTimeoutConsumer.accept(e);
- }
+ return timeoutException -> {
+ final HandlerRegistration registration = reference.get();
+ registeredHandlers.remove(registration);
+ onTimeoutConsumer.accept(timeoutException);
+ registration.confirmError();
};
}
diff --git
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultContextHandler.java
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultContextHandler.java
index e7e9e163e7..e99ed50942 100644
---
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultContextHandler.java
+++
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultContextHandler.java
@@ -18,27 +18,32 @@
*/
package org.apache.plc4x.java.spi.internal;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.apache.plc4x.java.spi.ConversationContext;
-import java.util.function.BooleanSupplier;
-
class DefaultContextHandler implements ConversationContext.ContextHandler {
- private final BooleanSupplier getDone;
+ private final Future<Void> awaitable;
private final Runnable cancel;
- public DefaultContextHandler(BooleanSupplier getDone, Runnable cancel) {
- this.getDone = getDone;
+ public DefaultContextHandler(Future<Void> awaitable, Runnable cancel) {
+ this.awaitable = awaitable;
this.cancel = cancel;
}
@Override
public boolean isDone() {
- return this.getDone.getAsBoolean();
+ return this.awaitable.isDone();
}
@Override
public void cancel() {
this.cancel.run();
}
+
+ @Override
+ public void awaitResponse() throws InterruptedException,
ExecutionException {
+ this.awaitable.get();
+ }
}
diff --git
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultExpectRequestContext.java
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultExpectRequestContext.java
index 74bf96df9b..ec9be896c3 100644
---
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultExpectRequestContext.java
+++
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultExpectRequestContext.java
@@ -82,7 +82,7 @@ public class DefaultExpectRequestContext<T> implements
ConversationContext.Expec
this.packetConsumer = packetConsumer;
registration = new HandlerRegistration(commands, expectClazz,
packetConsumer, onTimeoutConsumer, errorConsumer, timeout);
finisher.accept(registration);
- return new DefaultContextHandler(registration::hasHandled,
registration::cancel);
+ return new DefaultContextHandler(registration, registration::cancel);
}
@Override
diff --git
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
index cfa448806f..c975a65149 100644
---
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
+++
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
@@ -95,7 +95,7 @@ public class DefaultSendRequestContext<T> implements
ConversationContext.SendReq
onTimeoutConsumer, errorConsumer, timeout);
finisher.accept(registration);
context.sendToWire(request);
- return new DefaultContextHandler(registration::hasHandled,
registration::cancel);
+ return new DefaultContextHandler(registration, registration::cancel);
}
@Override
diff --git
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java
index c71b203a3d..8ada15fee9 100644
---
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java
+++
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java
@@ -22,13 +22,17 @@ import io.vavr.control.Either;
import java.time.Duration;
import java.util.Deque;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
-public class HandlerRegistration {
+public class HandlerRegistration implements Future<Void> {
private static int counter = 0;
@@ -43,17 +47,36 @@ public class HandlerRegistration {
private final Consumer<TimeoutException> onTimeoutConsumer;
private final BiConsumer<?, ? extends Throwable> errorConsumer;
+ private final Runnable onHandled;
+ private final Runnable onError;
+ private final Runnable onCancelled;
private final Duration timeout;
- private volatile boolean cancelled = false;
- private volatile boolean handled = false;
+ private final CompletableFuture<Void> handled = new CompletableFuture<>();
public HandlerRegistration(Deque<Either<Function<?, ?>, Predicate<?>>>
commands, Class<?> expectClazz, Consumer<?> packetConsumer,
Consumer<TimeoutException> onTimeoutConsumer, BiConsumer<?, ? extends
Throwable> errorConsumer, Duration timeout) {
+ this(
+ commands,
+ expectClazz,
+ packetConsumer,
+ onTimeoutConsumer,
+ errorConsumer,
+ () -> {},
+ () -> {},
+ () -> {},
+ timeout
+ );
+ }
+
+ public HandlerRegistration(Deque<Either<Function<?, ?>, Predicate<?>>>
commands, Class<?> expectClazz, Consumer<?> packetConsumer,
Consumer<TimeoutException> onTimeoutConsumer, BiConsumer<?, ? extends
Throwable> errorConsumer, Runnable onHandled, Runnable onError, Runnable
onCancelled, Duration timeout) {
this.commands = commands;
this.expectClazz = expectClazz;
this.packetConsumer = packetConsumer;
this.onTimeoutConsumer = onTimeoutConsumer;
this.errorConsumer = errorConsumer;
+ this.onHandled = onHandled;
+ this.onError = onError;
+ this.onCancelled = onCancelled;
this.timeout = timeout;
}
@@ -82,21 +105,59 @@ public class HandlerRegistration {
}
public void cancel() {
- this.cancelled = true;
+ handled.cancel(true);
+ onCancelled.run();
+ }
+
+ @Override
+ public boolean cancel(boolean ignored) {
+ if (isCancelled()) {
+ return false;
+ } else {
+ cancel();
+ return true;
+ }
}
public boolean isCancelled() {
- return this.cancelled;
+ return handled.isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return hasHandled();
+ }
+
+ @Override
+ public Void get() throws InterruptedException, ExecutionException {
+ return handled.get();
+ }
+
+ @Override
+ public Void get(long amount, TimeUnit timeUnit) throws
InterruptedException, ExecutionException, TimeoutException {
+ return handled.get(amount, timeUnit);
}
public void confirmHandled() {
- this.handled = true;
+ confirmCompleted();
+ this.onHandled.run();
+ }
+
+ public void confirmError() {
+ confirmCompleted();
+ this.onError.run();
+ }
+
+ public void confirmCompleted() {
+ this.handled.complete(null);
}
public boolean hasHandled() {
- return this.handled;
+ return this.handled.isDone();
}
+
+
@Override
public String toString() {
return "HandlerRegistration#" + id;
diff --git
a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java
b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java
index 07aa5a0a1c..90695013d6 100644
---
a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java
+++
b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java
@@ -86,7 +86,7 @@ class Plc4xNettyWrapperTest {
.onError((value, throwable) -> error.set(true))
.handle((answer) -> handled.set(true));
- Thread.sleep(750);
+ handler.awaitResponse();
verify(true, false, false);
wrapper.decode(channelHandlerContext, new Date(), new ArrayList<>());
@@ -104,6 +104,7 @@ class Plc4xNettyWrapperTest {
verify(false, false, false);
wrapper.decode(channelHandlerContext, new Date(), new ArrayList<>());
+ handler.awaitResponse();
verify(false, false, true);
}