This is an automated email from the ASF dual-hosted git repository. jfeinauer pushed a commit to branch feature/PLC4X-207-fix-silent-exception in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit e028de586c7fe2736d0c3e2743d193d8e1f057b3 Author: Julian Feinauer <[email protected]> AuthorDate: Wed Jul 8 21:47:29 2020 +0200 PLC4X-207 When a Handler Timeout occurs cancel the read future to notify the user. --- .../s7/readwrite/protocol/S7ProtocolLogic.java | 34 ++++++++++++++++++++-- .../apache/plc4x/java/spi/Plc4xNettyWrapper.java | 3 ++ .../spi/internal/DefaultSendRequestContext.java | 5 ---- .../java/spi/internal/HandlerRegistration.java | 4 +++ .../plc4x/java/spi/optimizer/BaseOptimizer.java | 5 +++- 5 files changed, 42 insertions(+), 9 deletions(-) diff --git a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java index 0221603..6a057e5 100644 --- a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java +++ b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java @@ -67,6 +67,7 @@ import org.apache.plc4x.java.s7.readwrite.SzlDataTreeItem; import org.apache.plc4x.java.s7.readwrite.SzlId; import org.apache.plc4x.java.s7.readwrite.TPKTPacket; import org.apache.plc4x.java.s7.readwrite.context.S7DriverContext; +import org.apache.plc4x.java.s7.readwrite.field.S7Field; import org.apache.plc4x.java.s7.readwrite.io.DataItemIO; import org.apache.plc4x.java.s7.readwrite.types.COTPProtocolClass; import org.apache.plc4x.java.s7.readwrite.types.COTPTpduSize; @@ -75,7 +76,6 @@ import org.apache.plc4x.java.s7.readwrite.types.DataTransportSize; import org.apache.plc4x.java.s7.readwrite.types.S7ControllerType; import org.apache.plc4x.java.s7.readwrite.types.SzlModuleTypeClass; import org.apache.plc4x.java.s7.readwrite.types.SzlSublist; -import org.apache.plc4x.java.s7.readwrite.field.S7Field; import org.apache.plc4x.java.spi.ConversationContext; import org.apache.plc4x.java.spi.Plc4xProtocolBase; import org.apache.plc4x.java.spi.context.DriverContext; @@ -96,8 +96,10 @@ import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; @@ -139,11 +141,19 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> { .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT) .check(p -> p.getPayload() instanceof COTPPacketConnectionResponse) .unwrap(p -> (COTPPacketConnectionResponse) p.getPayload()) + .onTimeout(e -> { + logger.warn("Timeout during Connection establishing, closing channel..."); + context.getChannel().close(); + }) .handle(cotpPacketConnectionResponse -> { logger.debug("Got COTP Connection Response"); logger.debug("Sending S7 Connection Request"); context.sendRequest(createS7ConnectionRequest(cotpPacketConnectionResponse)) .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT) + .onTimeout(e -> { + logger.warn("Timeout during Connection establishing, closing channel..."); + context.getChannel().close(); + }) .unwrap(TPKTPacket::getPayload) .only(COTPPacketData.class) .unwrap(COTPPacket::getPayload) @@ -177,6 +187,10 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> { TPKTPacket tpktPacket = createIdentifyRemoteMessage(); context.sendRequest(tpktPacket) .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT) + .onTimeout(e -> { + logger.warn("Timeout during Connection establishing, closing channel..."); + context.getChannel().close(); + }) .check(p -> p.getPayload() instanceof COTPPacketData) .unwrap(p -> ((COTPPacketData) p.getPayload())) .check(p -> p.getPayload() instanceof S7MessageUserData) @@ -240,8 +254,14 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> { RequestTransactionManager.RequestTransaction transaction = tm.startRequest(); transaction.submit(() -> context.sendRequest(tpktPacket) .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT) - .onTimeout(future::completeExceptionally) - .onError((p, e) -> future.completeExceptionally(e)) + .onTimeout((e) -> { + future.completeExceptionally(e); + transaction.endRequest(); + }) + .onError((p, e) -> { + future.completeExceptionally(e); + transaction.endRequest(); + }) .check(p -> p.getPayload() instanceof COTPPacketData) .unwrap(p -> (COTPPacketData) p.getPayload()) .check(p -> p.getPayload() instanceof S7MessageResponseData) @@ -299,6 +319,14 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> { return future; } + /** + * This method is only called when there is no Response Handler. + */ + @Override + protected void decode(ConversationContext<TPKTPacket> context, TPKTPacket msg) throws Exception { + throw new IllegalStateException("This should not happen!"); + } + @Override public void close(ConversationContext<TPKTPacket> context) { // TODO Implement Closing on Protocol Level diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java index ca8dcce..994b940 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java @@ -125,6 +125,9 @@ public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, Object> { // Check if the handler can still be used or should be removed if (registration.getTimeout().isBefore(Instant.now())) { logger.debug("Removing {} as its timed out (was set till {})", registration, registration.getTimeout()); + // Call callback + registration.handleTimeout(); + // Remove iter.remove(); continue; } diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java index 16eef84..a377a4d 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java @@ -120,11 +120,6 @@ public class DefaultSendRequestContext<T> implements ConversationContext.SendReq if (expectClazz == null) { throw new ConversationContext.PlcWiringException("expectResponse must be called before first unwrap"); } - if (onTimeoutConsumer == null) { - onTimeoutConsumer = e -> { - // NOOP - }; - } commands.addLast(Either.left(unwrapper)); return new DefaultSendRequestContext<R>(commands, timeout, finisher, request, context, expectClazz, packetConsumer, onTimeoutConsumer, errorConsumer); } diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java index 38f5eed..fcab31b 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java @@ -79,6 +79,10 @@ public class HandlerRegistration { return timeout; } + public void handleTimeout() { + this.onTimeoutConsumer.accept(new TimeoutException("Timed out while waiting for response")); + } + @Override public String toString() { return "HandlerRegistration#" + id; diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/optimizer/BaseOptimizer.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/optimizer/BaseOptimizer.java index 3c252f4..00b97a6 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/optimizer/BaseOptimizer.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/optimizer/BaseOptimizer.java @@ -152,7 +152,10 @@ public abstract class BaseOptimizer { } // As soon as all sub-futures are done, merge the individual responses back to one big response. - CompletableFuture.allOf(subFutures.values().toArray(new CompletableFuture[0])).thenApply(aVoid -> { + CompletableFuture.allOf(subFutures.values().toArray(new CompletableFuture[0])).handle((aVoid, t) -> { + if (t != null) { + parentFuture.completeExceptionally(t); + } Map<PlcRequest, Either<PlcResponse, Exception>> results = new HashMap<>(); for (Map.Entry<PlcRequest, CompletableFuture<PlcResponse>> subFutureEntry : subFutures.entrySet()) { PlcRequest subRequest = subFutureEntry.getKey();
