This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit f257a6d24a132a1f5a3995a2a5e5a2ce8c7ed5b8 Author: Sebastian Rühl <sru...@apache.org> AuthorDate: Mon Oct 29 08:27:07 2018 +0100 [driver-bases] SingleItemToSingleRequestProtocol: added SplitConfig for granular config of item splitting. --- .../java/ads/connection/AdsTcpPlcConnection.java | 2 +- .../SingleItemToSingleRequestProtocol.java | 101 +++++++++++-- .../SingleItemToSingleRequestProtocolTest.java | 159 +++++++++++++++++++++ 3 files changed, 252 insertions(+), 10 deletions(-) diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java index 5d10a01..0f74c44 100644 --- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java +++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java @@ -117,7 +117,7 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc pipeline.addLast(new Payload2TcpProtocol()); pipeline.addLast(new Ads2PayloadProtocol()); pipeline.addLast(new Plc4x2AdsProtocol(targetAmsNetId, targetAmsPort, sourceAmsNetId, sourceAmsPort, fieldMapping)); - pipeline.addLast(new SingleItemToSingleRequestProtocol(AdsTcpPlcConnection.this, AdsTcpPlcConnection.this, AdsTcpPlcConnection.this, timer)); + pipeline.addLast(new SingleItemToSingleRequestProtocol(AdsTcpPlcConnection.this, AdsTcpPlcConnection.this, AdsTcpPlcConnection.this, timer, SingleItemToSingleRequestProtocol.SplitConfig.builder().dontSplitSubscribe().dontSplitUnsubscribe().build())); } }; } diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java index a6f1471..cbd5b1a 100644 --- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java +++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java @@ -91,20 +91,27 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { private AtomicLong erroredItems; + private SplitConfig splitConfig; + public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer) { - this(reader, writer, subscriber, timer, true); + this(reader, writer, subscriber, timer, new SplitConfig()); + } + + public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer, SplitConfig splitConfig) { + this(reader, writer, subscriber, timer, splitConfig, true); } - public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer, boolean betterImplementationPossible) { - this(reader, writer, subscriber, timer, TimeUnit.SECONDS.toMillis(30), betterImplementationPossible); + public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer, SplitConfig splitConfig, boolean betterImplementationPossible) { + this(reader, writer, subscriber, timer, TimeUnit.SECONDS.toMillis(30), splitConfig, betterImplementationPossible); } - public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer, long defaultReceiveTimeout, boolean betterImplementationPossible) { + public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer, long defaultReceiveTimeout, SplitConfig splitConfig, boolean betterImplementationPossible) { this.reader = reader; this.writer = writer; this.subscriber = subscriber; this.timer = timer; this.defaultReceiveTimeout = defaultReceiveTimeout; + this.splitConfig = splitConfig; if (betterImplementationPossible) { String callStack = Arrays.stream(Thread.currentThread().getStackTrace()) .map(StackTraceElement::toString) @@ -281,10 +288,10 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { // Create a promise that has to be called multiple times. PromiseCombiner promiseCombiner = new PromiseCombiner(); InternalPlcRequest request = in.getRequest(); - if (request instanceof InternalPlcFieldRequest) { + if (request instanceof InternalPlcFieldRequest && (splitConfig.splitRead || splitConfig.splitWrite || splitConfig.splitSubscription)) { InternalPlcFieldRequest internalPlcFieldRequest = (InternalPlcFieldRequest) request; - if (internalPlcFieldRequest instanceof InternalPlcReadRequest) { + if (internalPlcFieldRequest instanceof InternalPlcReadRequest && splitConfig.splitRead) { InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) internalPlcFieldRequest; internalPlcReadRequest.getNamedFields().forEach(field -> { ChannelPromise subPromise = new DefaultChannelPromise(promise.channel()); @@ -309,7 +316,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { } promiseCombiner.add((Future) subPromise); }); - } else if (internalPlcFieldRequest instanceof InternalPlcWriteRequest) { + } else if (internalPlcFieldRequest instanceof InternalPlcWriteRequest && splitConfig.splitWrite) { InternalPlcWriteRequest internalPlcWriteRequest = (InternalPlcWriteRequest) internalPlcFieldRequest; internalPlcWriteRequest.getNamedFieldTriples().forEach(fieldItemTriple -> { ChannelPromise subPromise = new DefaultChannelPromise(promise.channel()); @@ -332,7 +339,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { } promiseCombiner.add((Future) subPromise); }); - } else if (internalPlcFieldRequest instanceof InternalPlcSubscriptionRequest) { + } else if (internalPlcFieldRequest instanceof InternalPlcSubscriptionRequest && splitConfig.splitSubscription) { InternalPlcSubscriptionRequest internalPlcSubscriptionRequest = (InternalPlcSubscriptionRequest) internalPlcFieldRequest; internalPlcSubscriptionRequest.getNamedSubscriptionFields().forEach(field -> { ChannelPromise subPromise = new DefaultChannelPromise(promise.channel()); @@ -360,7 +367,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { } else { throw new PlcProtocolException("Unmapped request type " + request.getClass()); } - } else if (request instanceof InternalPlcUnsubscriptionRequest) { + } else if (request instanceof InternalPlcUnsubscriptionRequest && splitConfig.splitUnsubscription) { InternalPlcUnsubscriptionRequest internalPlcUnsubscriptionRequest = (InternalPlcUnsubscriptionRequest) request; internalPlcUnsubscriptionRequest.getInternalPlcSubscriptionHandles().forEach(handle -> { ChannelPromise subPromise = new DefaultChannelPromise(promise.channel()); @@ -556,4 +563,80 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { statistics.put("erroredContainers", erroredContainers.get()); return statistics; } + + public static class SplitConfig { + private final boolean splitRead; + private final boolean splitWrite; + private final boolean splitSubscription; + private final boolean splitUnsubscription; + + public SplitConfig() { + splitRead = true; + splitWrite = true; + splitSubscription = true; + splitUnsubscription = true; + } + + private SplitConfig(boolean splitRead, boolean splitWrite, boolean splitSubscription, boolean splitUnsubscription) { + this.splitRead = splitRead; + this.splitWrite = splitWrite; + this.splitSubscription = splitSubscription; + this.splitUnsubscription = splitUnsubscription; + } + + public static SplitConfigBuilder builder() { + return new SplitConfigBuilder(); + } + + public static class SplitConfigBuilder { + private boolean splitRead = true; + private boolean splitWrite = true; + private boolean splitSubscription = true; + private boolean splitUnsubscription = true; + + public SplitConfigBuilder splitRead() { + splitRead = true; + return this; + } + + public SplitConfigBuilder dontSplitRead() { + splitRead = false; + return this; + } + + public SplitConfigBuilder splitWrite() { + splitWrite = true; + return this; + } + + public SplitConfigBuilder dontSplitWrite() { + splitWrite = false; + return this; + } + + public SplitConfigBuilder splitSubscribe() { + splitSubscription = true; + return this; + } + + public SplitConfigBuilder dontSplitSubscribe() { + splitSubscription = false; + return this; + } + + public SplitConfigBuilder splitUnsubscribe() { + splitUnsubscription = true; + return this; + } + + public SplitConfigBuilder dontSplitUnsubscribe() { + splitUnsubscription = false; + return this; + } + + public SplitConfig build() { + return new SplitConfig(splitRead, splitWrite, splitSubscription, splitUnsubscription); + } + } + } } diff --git a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java index f3a3592..cd1216c 100644 --- a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java +++ b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java @@ -83,6 +83,7 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions { mockSubscriber, new HashedWheelTimer(), TimeUnit.SECONDS.toMillis(1), + new SingleItemToSingleRequestProtocol.SplitConfig(), false ); SUT.channelRegistered(channelHandlerContext); @@ -149,6 +150,164 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions { } @Nested + class SplitConfig { + + @Nested + class SplitOn { + @BeforeEach + void setUp() throws Exception { + // We setup the SUT with a special configuration + SUT = new SingleItemToSingleRequestProtocol( + mockReader, + mockWriter, + mockSubscriber, + new HashedWheelTimer(), + TimeUnit.SECONDS.toMillis(1), + SingleItemToSingleRequestProtocol.SplitConfig.builder() + .dontSplitRead() + .dontSplitWrite() + .dontSplitSubscribe() + .dontSplitUnsubscribe() + .build(), + false + ); + SUT.channelRegistered(channelHandlerContext); + when(channelHandlerContext.executor().inEventLoop()).thenReturn(true); + } + + @Test + void read() throws Exception { + // Given + // we have a simple read + PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture); + // When + // we write this + SUT.write(channelHandlerContext, msg, channelPromise); + // then + // we should invoke this only one time + verify(channelHandlerContext, times(1)).write(eq(msg), any()); + } + + @Test + void write() throws Exception { + // Given + // we have a simple write + PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcWriteRequest.build(mockWriter), responseCompletableFuture); + // When + // we write this + SUT.write(channelHandlerContext, msg, channelPromise); + // then + // we should invoke this only one time + verify(channelHandlerContext, times(1)).write(eq(msg), any()); + } + + @Test + void subscribe() throws Exception { + // Given + // we have a simple subscribe + PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcSubscriptionRequest.build(mockSubscriber), responseCompletableFuture); + // When + // we write this + SUT.write(channelHandlerContext, msg, channelPromise); + // then + // we should invoke this only one time + verify(channelHandlerContext, times(1)).write(eq(msg), any()); + } + + @Test + void unsubsribe() throws Exception { + // Given + // we have a simple unsubscribe + PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcUnsubscriptionRequest.build(mockSubscriber), responseCompletableFuture); + // When + // we write this + SUT.write(channelHandlerContext, msg, channelPromise); + // then + // we should invoke this only one time + verify(channelHandlerContext, times(1)).write(eq(msg), any()); + } + } + + @Nested + class SplitOff { + @BeforeEach + void setUp() throws Exception { + // We setup the SUT with a special configuration + SUT = new SingleItemToSingleRequestProtocol( + mockReader, + mockWriter, + mockSubscriber, + new HashedWheelTimer(), + TimeUnit.SECONDS.toMillis(1), + SingleItemToSingleRequestProtocol.SplitConfig.builder() + .splitRead() + .splitWrite() + .splitSubscribe() + .splitUnsubscribe() + .build(), + false + ); + SUT.channelRegistered(channelHandlerContext); + when(channelHandlerContext.executor().inEventLoop()).thenReturn(true); + } + + @Test + void read() throws Exception { + // Given + // we have a simple read + PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture); + // When + // we write this + SUT.write(channelHandlerContext, msg, channelPromise); + // then + // we should invoke this only one time + verify(channelHandlerContext, times(5)).write(any(), any()); + } + + @Test + void write() throws Exception { + // Given + // we have a simple write + PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcWriteRequest.build(mockWriter), responseCompletableFuture); + // When + // we write this + SUT.write(channelHandlerContext, msg, channelPromise); + // then + // we should invoke this only one time + verify(channelHandlerContext, times(5)).write(any(), any()); + } + + @Test + void subscribe() throws Exception { + // Given + // we have a simple subscribe + PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcSubscriptionRequest.build(mockSubscriber), responseCompletableFuture); + // When + // we write this + SUT.write(channelHandlerContext, msg, channelPromise); + // then + // we should invoke this only one time + verify(channelHandlerContext, times(3)).write(any(), any()); + } + + @Test + void unsubsribe() throws Exception { + // Given + // we have a simple unsubscribe + PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcUnsubscriptionRequest.build(mockSubscriber), responseCompletableFuture); + // When + // we write this + SUT.write(channelHandlerContext, msg, channelPromise); + // then + // we should invoke this only one time + verify(channelHandlerContext, times(3)).write(any(), any()); + } + } + + + } + + @Nested class Roundtrip { @Nested class Read {