This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6073d4bf47 feat(plc4j/spi): Add option to synchronously await response 
from PLC (#1163)
6073d4bf47 is described below

commit 6073d4bf4755c31600e858745bac45a12e937d6f
Author: Rajmund Takács <[email protected]>
AuthorDate: Wed Oct 25 09:09:15 2023 +0200

    feat(plc4j/spi): Add option to synchronously await response from PLC (#1163)
    
    There might be cases when the driver needs to wait until the device
    responds our last request, before sending the next request. This
    feature attempts to make this more convenient.
---
 .../apache/plc4x/java/spi/ConversationContext.java |  3 +
 .../apache/plc4x/java/spi/Plc4xNettyWrapper.java   | 19 ++++--
 .../java/spi/internal/DefaultContextHandler.java   | 17 +++--
 .../spi/internal/DefaultExpectRequestContext.java  |  2 +-
 .../spi/internal/DefaultSendRequestContext.java    |  2 +-
 .../java/spi/internal/HandlerRegistration.java     | 75 ++++++++++++++++++++--
 .../plc4x/java/spi/Plc4xNettyWrapperTest.java      |  3 +-
 7 files changed, 99 insertions(+), 22 deletions(-)

diff --git 
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/ConversationContext.java 
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/ConversationContext.java
index 94b09a282b..72508b5484 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/ConversationContext.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/ConversationContext.java
@@ -19,6 +19,7 @@
 package org.apache.plc4x.java.spi;
 
 import io.netty.channel.Channel;
+import java.util.concurrent.ExecutionException;
 import org.apache.plc4x.java.api.authentication.PlcAuthentication;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 import org.apache.plc4x.java.spi.configuration.Configuration;
@@ -92,6 +93,8 @@ public interface ConversationContext<T> {
 
         void cancel();
 
+        void awaitResponse() throws InterruptedException, ExecutionException;
+
     }
 
 }
diff --git 
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java 
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
index 27abd9324d..e1f708072d 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
@@ -152,6 +152,11 @@ public class Plc4xNettyWrapper<T> extends 
MessageToMessageCodec<T, Object> {
                 continue;
             }
             // Timeout?
+            if (registration.isDone()) {
+                logger.debug("Removing {} as it's already done. Timed out?", 
registration);
+                iter.remove();
+                continue;
+            }
             logger.trace("Checking handler {} for Object of type {}", 
registration, t.getClass().getSimpleName());
             if (registration.getExpectClazz().isInstance(t)) {
                 logger.trace("Handler {} has right expected type {}, checking 
condition", registration, registration.getExpectClazz().getSimpleName());
@@ -231,6 +236,9 @@ public class Plc4xNettyWrapper<T> extends 
MessageToMessageCodec<T, Object> {
             completionCallback.andThen(handler.getPacketConsumer()),
             handler.getOnTimeoutConsumer(),
             handler.getErrorConsumer(),
+            handler::confirmHandled,
+            handler::confirmError,
+            handler::cancel,
             handler.getTimeout()
         );
         deferred.set(registration);
@@ -238,12 +246,11 @@ public class Plc4xNettyWrapper<T> extends 
MessageToMessageCodec<T, Object> {
     }
 
     private Consumer<TimeoutException> 
onTimeout(AtomicReference<HandlerRegistration> reference, 
Consumer<TimeoutException> onTimeoutConsumer) {
-        return new Consumer<TimeoutException>() {
-            @Override
-            public void accept(TimeoutException e) {
-                registeredHandlers.remove(reference.get());
-                onTimeoutConsumer.accept(e);
-            }
+        return timeoutException -> {
+            final HandlerRegistration registration = reference.get();
+            registeredHandlers.remove(registration);
+            onTimeoutConsumer.accept(timeoutException);
+            registration.confirmError();
         };
     }
 
diff --git 
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultContextHandler.java
 
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultContextHandler.java
index e7e9e163e7..e99ed50942 100644
--- 
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultContextHandler.java
+++ 
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultContextHandler.java
@@ -18,27 +18,32 @@
  */
 package org.apache.plc4x.java.spi.internal;
 
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import org.apache.plc4x.java.spi.ConversationContext;
 
-import java.util.function.BooleanSupplier;
-
 class DefaultContextHandler implements ConversationContext.ContextHandler {
 
-    private final BooleanSupplier getDone;
+    private final Future<Void> awaitable;
     private final Runnable cancel;
 
-    public DefaultContextHandler(BooleanSupplier getDone, Runnable cancel) {
-        this.getDone = getDone;
+    public DefaultContextHandler(Future<Void> awaitable, Runnable cancel) {
+        this.awaitable = awaitable;
         this.cancel = cancel;
     }
 
     @Override
     public boolean isDone() {
-        return this.getDone.getAsBoolean();
+        return this.awaitable.isDone();
     }
 
     @Override
     public void cancel() {
         this.cancel.run();
     }
+
+    @Override
+    public void awaitResponse() throws InterruptedException, 
ExecutionException {
+        this.awaitable.get();
+    }
 }
diff --git 
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultExpectRequestContext.java
 
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultExpectRequestContext.java
index 74bf96df9b..ec9be896c3 100644
--- 
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultExpectRequestContext.java
+++ 
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultExpectRequestContext.java
@@ -82,7 +82,7 @@ public class DefaultExpectRequestContext<T> implements 
ConversationContext.Expec
         this.packetConsumer = packetConsumer;
         registration = new HandlerRegistration(commands, expectClazz, 
packetConsumer, onTimeoutConsumer, errorConsumer, timeout);
         finisher.accept(registration);
-        return new DefaultContextHandler(registration::hasHandled, 
registration::cancel);
+        return new DefaultContextHandler(registration, registration::cancel);
     }
 
     @Override
diff --git 
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
 
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
index cfa448806f..c975a65149 100644
--- 
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
+++ 
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
@@ -95,7 +95,7 @@ public class DefaultSendRequestContext<T> implements 
ConversationContext.SendReq
             onTimeoutConsumer, errorConsumer, timeout);
         finisher.accept(registration);
         context.sendToWire(request);
-        return new DefaultContextHandler(registration::hasHandled, 
registration::cancel);
+        return new DefaultContextHandler(registration, registration::cancel);
     }
 
     @Override
diff --git 
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java
 
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java
index c71b203a3d..8ada15fee9 100644
--- 
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java
+++ 
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java
@@ -22,13 +22,17 @@ import io.vavr.control.Either;
 
 import java.time.Duration;
 import java.util.Deque;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
 
-public class HandlerRegistration {
+public class HandlerRegistration implements Future<Void> {
 
     private static int counter = 0;
 
@@ -43,17 +47,36 @@ public class HandlerRegistration {
     private final Consumer<TimeoutException> onTimeoutConsumer;
 
     private final BiConsumer<?, ? extends Throwable> errorConsumer;
+    private final Runnable onHandled;
+    private final Runnable onError;
+    private final Runnable onCancelled;
     private final Duration timeout;
 
-    private volatile boolean cancelled = false;
-    private volatile boolean handled = false;
+    private final CompletableFuture<Void> handled = new CompletableFuture<>();
 
     public HandlerRegistration(Deque<Either<Function<?, ?>, Predicate<?>>> 
commands, Class<?> expectClazz, Consumer<?> packetConsumer, 
Consumer<TimeoutException> onTimeoutConsumer, BiConsumer<?, ? extends 
Throwable> errorConsumer, Duration timeout) {
+        this(
+            commands,
+            expectClazz,
+            packetConsumer,
+            onTimeoutConsumer,
+            errorConsumer,
+            () -> {},
+            () -> {},
+            () -> {},
+            timeout
+        );
+    }
+
+    public HandlerRegistration(Deque<Either<Function<?, ?>, Predicate<?>>> 
commands, Class<?> expectClazz, Consumer<?> packetConsumer, 
Consumer<TimeoutException> onTimeoutConsumer, BiConsumer<?, ? extends 
Throwable> errorConsumer, Runnable onHandled, Runnable onError, Runnable 
onCancelled, Duration timeout) {
         this.commands = commands;
         this.expectClazz = expectClazz;
         this.packetConsumer = packetConsumer;
         this.onTimeoutConsumer = onTimeoutConsumer;
         this.errorConsumer = errorConsumer;
+        this.onHandled = onHandled;
+        this.onError = onError;
+        this.onCancelled = onCancelled;
         this.timeout = timeout;
     }
 
@@ -82,21 +105,59 @@ public class HandlerRegistration {
     }
 
     public void cancel() {
-        this.cancelled = true;
+        handled.cancel(true);
+        onCancelled.run();
+    }
+
+    @Override
+    public boolean cancel(boolean ignored) {
+        if (isCancelled()) {
+            return false;
+        } else {
+            cancel();
+            return true;
+        }
     }
 
     public boolean isCancelled() {
-        return this.cancelled;
+        return handled.isCancelled();
+    }
+
+    @Override
+    public boolean isDone() {
+        return hasHandled();
+    }
+
+    @Override
+    public Void get() throws InterruptedException, ExecutionException {
+        return handled.get();
+    }
+
+    @Override
+    public Void get(long amount, TimeUnit timeUnit) throws 
InterruptedException, ExecutionException, TimeoutException {
+        return handled.get(amount, timeUnit);
     }
 
     public void confirmHandled() {
-        this.handled = true;
+        confirmCompleted();
+        this.onHandled.run();
+    }
+
+    public void confirmError() {
+        confirmCompleted();
+        this.onError.run();
+    }
+
+    public void confirmCompleted() {
+        this.handled.complete(null);
     }
 
     public boolean hasHandled() {
-        return this.handled;
+        return this.handled.isDone();
     }
 
+
+
     @Override
     public String toString() {
         return "HandlerRegistration#" + id;
diff --git 
a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java 
b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java
index 07aa5a0a1c..90695013d6 100644
--- 
a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java
+++ 
b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java
@@ -86,7 +86,7 @@ class Plc4xNettyWrapperTest {
             .onError((value, throwable) -> error.set(true))
             .handle((answer) -> handled.set(true));
 
-        Thread.sleep(750);
+        handler.awaitResponse();
 
         verify(true, false, false);
         wrapper.decode(channelHandlerContext, new Date(), new ArrayList<>());
@@ -104,6 +104,7 @@ class Plc4xNettyWrapperTest {
 
         verify(false, false, false);
         wrapper.decode(channelHandlerContext, new Date(), new ArrayList<>());
+        handler.awaitResponse();
         verify(false, false, true);
     }
 

Reply via email to