This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch feature/PLC4X-207-fix-silent-exception
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit e028de586c7fe2736d0c3e2743d193d8e1f057b3
Author: Julian Feinauer <[email protected]>
AuthorDate: Wed Jul 8 21:47:29 2020 +0200

    PLC4X-207 When a Handler Timeout occurs cancel the read future to notify 
the user.
---
 .../s7/readwrite/protocol/S7ProtocolLogic.java     | 34 ++++++++++++++++++++--
 .../apache/plc4x/java/spi/Plc4xNettyWrapper.java   |  3 ++
 .../spi/internal/DefaultSendRequestContext.java    |  5 ----
 .../java/spi/internal/HandlerRegistration.java     |  4 +++
 .../plc4x/java/spi/optimizer/BaseOptimizer.java    |  5 +++-
 5 files changed, 42 insertions(+), 9 deletions(-)

diff --git 
a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
 
b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
index 0221603..6a057e5 100644
--- 
a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
+++ 
b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
@@ -67,6 +67,7 @@ import org.apache.plc4x.java.s7.readwrite.SzlDataTreeItem;
 import org.apache.plc4x.java.s7.readwrite.SzlId;
 import org.apache.plc4x.java.s7.readwrite.TPKTPacket;
 import org.apache.plc4x.java.s7.readwrite.context.S7DriverContext;
+import org.apache.plc4x.java.s7.readwrite.field.S7Field;
 import org.apache.plc4x.java.s7.readwrite.io.DataItemIO;
 import org.apache.plc4x.java.s7.readwrite.types.COTPProtocolClass;
 import org.apache.plc4x.java.s7.readwrite.types.COTPTpduSize;
@@ -75,7 +76,6 @@ import 
org.apache.plc4x.java.s7.readwrite.types.DataTransportSize;
 import org.apache.plc4x.java.s7.readwrite.types.S7ControllerType;
 import org.apache.plc4x.java.s7.readwrite.types.SzlModuleTypeClass;
 import org.apache.plc4x.java.s7.readwrite.types.SzlSublist;
-import org.apache.plc4x.java.s7.readwrite.field.S7Field;
 import org.apache.plc4x.java.spi.ConversationContext;
 import org.apache.plc4x.java.spi.Plc4xProtocolBase;
 import org.apache.plc4x.java.spi.context.DriverContext;
@@ -96,8 +96,10 @@ import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -139,11 +141,19 @@ public class S7ProtocolLogic extends 
Plc4xProtocolBase<TPKTPacket> {
             .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
             .check(p -> p.getPayload() instanceof COTPPacketConnectionResponse)
             .unwrap(p -> (COTPPacketConnectionResponse) p.getPayload())
+            .onTimeout(e -> {
+                logger.warn("Timeout during Connection establishing, closing 
channel...");
+                context.getChannel().close();
+            })
             .handle(cotpPacketConnectionResponse -> {
                 logger.debug("Got COTP Connection Response");
                 logger.debug("Sending S7 Connection Request");
                 
context.sendRequest(createS7ConnectionRequest(cotpPacketConnectionResponse))
                     .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
+                    .onTimeout(e -> {
+                        logger.warn("Timeout during Connection establishing, 
closing channel...");
+                        context.getChannel().close();
+                    })
                     .unwrap(TPKTPacket::getPayload)
                     .only(COTPPacketData.class)
                     .unwrap(COTPPacket::getPayload)
@@ -177,6 +187,10 @@ public class S7ProtocolLogic extends 
Plc4xProtocolBase<TPKTPacket> {
                         TPKTPacket tpktPacket = createIdentifyRemoteMessage();
                         context.sendRequest(tpktPacket)
                             .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
+                            .onTimeout(e -> {
+                                logger.warn("Timeout during Connection 
establishing, closing channel...");
+                                context.getChannel().close();
+                            })
                             .check(p -> p.getPayload() instanceof 
COTPPacketData)
                             .unwrap(p -> ((COTPPacketData) p.getPayload()))
                             .check(p -> p.getPayload() instanceof 
S7MessageUserData)
@@ -240,8 +254,14 @@ public class S7ProtocolLogic extends 
Plc4xProtocolBase<TPKTPacket> {
         RequestTransactionManager.RequestTransaction transaction = 
tm.startRequest();
         transaction.submit(() -> context.sendRequest(tpktPacket)
             .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
-            .onTimeout(future::completeExceptionally)
-            .onError((p, e) -> future.completeExceptionally(e))
+            .onTimeout((e) -> {
+                future.completeExceptionally(e);
+                transaction.endRequest();
+            })
+            .onError((p, e) -> {
+                future.completeExceptionally(e);
+                transaction.endRequest();
+            })
             .check(p -> p.getPayload() instanceof COTPPacketData)
             .unwrap(p -> (COTPPacketData) p.getPayload())
             .check(p -> p.getPayload() instanceof S7MessageResponseData)
@@ -299,6 +319,14 @@ public class S7ProtocolLogic extends 
Plc4xProtocolBase<TPKTPacket> {
         return future;
     }
 
+    /**
+     * This method is only called when there is no Response Handler.
+     */
+    @Override
+    protected void decode(ConversationContext<TPKTPacket> context, TPKTPacket 
msg) throws Exception {
+        throw new IllegalStateException("This should not happen!");
+    }
+
     @Override
     public void close(ConversationContext<TPKTPacket> context) {
         // TODO Implement Closing on Protocol Level
diff --git 
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java 
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
index ca8dcce..994b940 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
@@ -125,6 +125,9 @@ public class Plc4xNettyWrapper<T> extends 
MessageToMessageCodec<T, Object> {
             // Check if the handler can still be used or should be removed
             if (registration.getTimeout().isBefore(Instant.now())) {
                 logger.debug("Removing {} as its timed out (was set till {})", 
registration, registration.getTimeout());
+                // Call callback
+                registration.handleTimeout();
+                // Remove
                 iter.remove();
                 continue;
             }
diff --git 
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
 
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
index 16eef84..a377a4d 100644
--- 
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
+++ 
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
@@ -120,11 +120,6 @@ public class DefaultSendRequestContext<T> implements 
ConversationContext.SendReq
         if (expectClazz == null) {
             throw new ConversationContext.PlcWiringException("expectResponse 
must be called before first unwrap");
         }
-        if (onTimeoutConsumer == null) {
-            onTimeoutConsumer = e -> {
-                // NOOP
-            };
-        }
         commands.addLast(Either.left(unwrapper));
         return new DefaultSendRequestContext<R>(commands, timeout, finisher, 
request, context, expectClazz, packetConsumer, onTimeoutConsumer, 
errorConsumer);
     }
diff --git 
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java
 
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java
index 38f5eed..fcab31b 100644
--- 
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java
+++ 
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java
@@ -79,6 +79,10 @@ public class HandlerRegistration {
         return timeout;
     }
 
+    public void handleTimeout() {
+        this.onTimeoutConsumer.accept(new TimeoutException("Timed out while 
waiting for response"));
+    }
+
     @Override
     public String toString() {
         return "HandlerRegistration#" + id;
diff --git 
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/optimizer/BaseOptimizer.java
 
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/optimizer/BaseOptimizer.java
index 3c252f4..00b97a6 100644
--- 
a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/optimizer/BaseOptimizer.java
+++ 
b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/optimizer/BaseOptimizer.java
@@ -152,7 +152,10 @@ public abstract class BaseOptimizer {
             }
 
             // As soon as all sub-futures are done, merge the individual 
responses back to one big response.
-            CompletableFuture.allOf(subFutures.values().toArray(new 
CompletableFuture[0])).thenApply(aVoid -> {
+            CompletableFuture.allOf(subFutures.values().toArray(new 
CompletableFuture[0])).handle((aVoid, t) -> {
+                if (t != null) {
+                    parentFuture.completeExceptionally(t);
+                }
                 Map<PlcRequest, Either<PlcResponse, Exception>> results = new 
HashMap<>();
                 for (Map.Entry<PlcRequest, CompletableFuture<PlcResponse>> 
subFutureEntry : subFutures.entrySet()) {
                     PlcRequest subRequest = subFutureEntry.getKey();

Reply via email to