This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch feature/TopLevelItemSpliting
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 5951fc5e1ecc208363f653175c922b7ca2a3b53a
Author: Sebastian Rühl <sru...@apache.org>
AuthorDate: Thu Sep 27 12:13:30 2018 +0200

    [General] SingleItemToSingleRequestProtocol fixed a bunch of bugs and added 
tests
---
 .../SingleItemToSingleRequestProtocol.java         | 125 ++++++---
 .../SingleItemToSingleRequestProtocolTest.java     | 290 +++++++++++++++++++++
 pom.xml                                            |  34 ++-
 3 files changed, 399 insertions(+), 50 deletions(-)

diff --git 
a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
 
b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
index fa578f2..724ffad 100644
--- 
a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
+++ 
b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -36,29 +36,50 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 /**
  * This layer can be used to split a {@link 
org.apache.plc4x.java.api.messages.PlcRequest} which addresses multiple {@link 
PlcField}s into multiple subsequent {@link 
org.apache.plc4x.java.api.messages.PlcRequest}s.
  */
-// TODO: write test
 public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
 
     public static final Logger LOGGER = 
LoggerFactory.getLogger(SingleItemToSingleRequestProtocol.class);
 
     private PendingWriteQueue queue;
 
-    private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, 
InternalPlcResponse<?>>> sentButUnacknowledgedRequestItems;
+    // Map to track send subcontainers
+    private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, 
InternalPlcResponse<?>>> sentButUnacknowledgedSubContainer;
 
+    // Map to map tdpu to original parent container
+    private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, 
InternalPlcResponse<?>>> correlationToParentContainer;
+
+    // Map to track tdpus per container
     private ConcurrentMap<PlcRequestContainer<?, ?>, Set<Integer>> 
containerCorrelationIdMap;
 
+    // Map to track a list of responses per parent container
     private ConcurrentMap<PlcRequestContainer<?, ?>, 
List<InternalPlcResponse<?>>> responsesToBeDelivered;
 
     private AtomicInteger correlationId;
 
+    public SingleItemToSingleRequestProtocol() {
+        this(true);
+    }
+
+    public SingleItemToSingleRequestProtocol(boolean 
betterImplementationPossible) {
+        if (betterImplementationPossible) {
+            String callStack = 
Arrays.stream(Thread.currentThread().getStackTrace())
+                .map(StackTraceElement::toString)
+                .collect(Collectors.joining("\n"));
+            LOGGER.warn("Unoptimized Usage of {} detected at:\n{}", 
this.getClass(), callStack);
+            LOGGER.info("Consider implementing item splitting native to the 
protocol.");
+        }
+    }
+
     @Override
     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
         this.queue = new PendingWriteQueue(ctx);
-        this.sentButUnacknowledgedRequestItems = new ConcurrentHashMap<>();
+        this.sentButUnacknowledgedSubContainer = new ConcurrentHashMap<>();
+        this.correlationToParentContainer = new ConcurrentHashMap<>();
         this.containerCorrelationIdMap = new ConcurrentHashMap<>();
         this.responsesToBeDelivered = new ConcurrentHashMap<>();
         this.correlationId = new AtomicInteger();
@@ -68,6 +89,11 @@ public class SingleItemToSingleRequestProtocol extends 
ChannelDuplexHandler {
     @Override
     public void channelUnregistered(ChannelHandlerContext ctx) throws 
Exception {
         this.queue.removeAndWriteAll();
+        this.sentButUnacknowledgedSubContainer.clear();
+        this.correlationToParentContainer.clear();
+        this.containerCorrelationIdMap.clear();
+        this.responsesToBeDelivered.clear();
+        this.correlationId.set(0);
         super.channelUnregistered(ctx);
     }
 
@@ -82,20 +108,22 @@ public class SingleItemToSingleRequestProtocol extends 
ChannelDuplexHandler {
     // Decoding
     
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
 
-    private void tryFinish(int correlationId, InternalPlcResponse msg) {
-        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> 
plcRequestContainer = sentButUnacknowledgedRequestItems.remove(correlationId);
-        if (plcRequestContainer == null) {
+    protected void tryFinish(Integer correlationId, InternalPlcResponse msg, 
CompletableFuture<InternalPlcResponse<?>> originalResponseFuture) {
+        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> 
subPlcRequestContainer = 
sentButUnacknowledgedSubContainer.remove(correlationId);
+        LOGGER.info("{} got acknowledged", subPlcRequestContainer);
+        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> 
originalPlcRequestContainer = 
correlationToParentContainer.remove(correlationId);
+        if (originalPlcRequestContainer == null) {
             LOGGER.warn("Unrelated package received {}", msg);
             return;
         }
-        List<InternalPlcResponse<?>> correlatedResponseItems = 
responsesToBeDelivered.computeIfAbsent(plcRequestContainer, ignore -> new 
LinkedList<>());
+        List<InternalPlcResponse<?>> correlatedResponseItems = 
responsesToBeDelivered.computeIfAbsent(originalPlcRequestContainer, ignore -> 
new LinkedList<>());
         correlatedResponseItems.add(msg);
-        Set<Integer> integers = 
containerCorrelationIdMap.get(plcRequestContainer);
+        Set<Integer> integers = 
containerCorrelationIdMap.get(originalPlcRequestContainer);
         integers.remove(correlationId);
         if (integers.isEmpty()) {
             InternalPlcResponse<?> plcResponse;
-            if (plcRequestContainer.getRequest() instanceof 
InternalPlcReadRequest) {
-                InternalPlcReadRequest internalPlcReadRequest = 
(InternalPlcReadRequest) plcRequestContainer.getRequest();
+            if (originalPlcRequestContainer.getRequest() instanceof 
InternalPlcReadRequest) {
+                InternalPlcReadRequest internalPlcReadRequest = 
(InternalPlcReadRequest) originalPlcRequestContainer.getRequest();
                 HashMap<String, Pair<PlcResponseCode, FieldItem>> fields = new 
HashMap<>();
 
                 correlatedResponseItems.stream()
@@ -104,8 +132,8 @@ public class SingleItemToSingleRequestProtocol extends 
ChannelDuplexHandler {
                     .forEach(stringPairMap -> 
stringPairMap.forEach(fields::put));
 
                 plcResponse = new 
DefaultPlcReadResponse(internalPlcReadRequest, fields);
-            } else if (plcRequestContainer.getRequest() instanceof 
InternalPlcWriteRequest) {
-                InternalPlcWriteRequest internalPlcWriteRequest = 
(InternalPlcWriteRequest) plcRequestContainer.getRequest();
+            } else if (originalPlcRequestContainer.getRequest() instanceof 
InternalPlcWriteRequest) {
+                InternalPlcWriteRequest internalPlcWriteRequest = 
(InternalPlcWriteRequest) originalPlcRequestContainer.getRequest();
                 HashMap<String, PlcResponseCode> values = new HashMap<>();
 
                 correlatedResponseItems.stream()
@@ -115,20 +143,19 @@ public class SingleItemToSingleRequestProtocol extends 
ChannelDuplexHandler {
 
                 plcResponse = new 
DefaultPlcWriteResponse(internalPlcWriteRequest, values);
             } else {
-                throw new PlcRuntimeException("Unknown type detected " + 
plcRequestContainer.getRequest());
+                throw new PlcRuntimeException("Unknown type detected " + 
originalPlcRequestContainer.getRequest());
             }
-            plcRequestContainer.getResponseFuture().complete(plcResponse);
-            responsesToBeDelivered.remove(plcRequestContainer);
+            responsesToBeDelivered.remove(originalPlcRequestContainer);
+            containerCorrelationIdMap.remove(originalPlcRequestContainer);
+            originalResponseFuture.complete(plcResponse);
         }
     }
 
-    private void errored(int correlationId, Throwable throwable) {
-        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> 
plcRequestContainer = sentButUnacknowledgedRequestItems.remove(correlationId);
-        if (plcRequestContainer == null) {
-            LOGGER.warn("Unrelated error received ", throwable);
-            return;
-        }
-        
plcRequestContainer.getResponseFuture().completeExceptionally(throwable);
+    protected void errored(int correlationId, Throwable throwable, 
CompletableFuture<InternalPlcResponse<?>> originalResponseFuture) {
+        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> 
plcRequestContainer = sentButUnacknowledgedSubContainer.remove(correlationId);
+        // TODO: cleanup missing maps as the complete response gets canceled 
now.
+        LOGGER.warn("PlcRequestContainer {} and correlationId {} failed ", 
plcRequestContainer, correlationId, throwable);
+        originalResponseFuture.completeExceptionally(throwable);
     }
 
     
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -153,17 +180,21 @@ public class SingleItemToSingleRequestProtocol extends 
ChannelDuplexHandler {
                     internalPlcReadRequest.getNamedFields().forEach(field -> {
                         ChannelPromise subPromise = new 
DefaultChannelPromise(promise.channel());
 
-                        int tdpu = correlationId.getAndIncrement();
-                        CompletableFuture<InternalPlcResponse> 
correlatedCompletableFuture = new CompletableFuture<>()
+                        Integer tdpu = correlationId.getAndIncrement();
+                        CompletableFuture<InternalPlcResponse> 
correlatedCompletableFuture = new CompletableFuture<>();
+                        // Important: don't chain to above as we want the 
above to be completed not the result of when complete
+                        correlatedCompletableFuture
                             .thenApply(InternalPlcResponse.class::cast)
                             .whenComplete((internalPlcResponse, throwable) -> {
                                 if (throwable != null) {
-                                    errored(tdpu, throwable);
+                                    errored(tdpu, throwable, 
in.getResponseFuture());
                                 } else {
-                                    tryFinish(tdpu, internalPlcResponse);
+                                    tryFinish(tdpu, internalPlcResponse, 
in.getResponseFuture());
                                 }
                             });
-                        queue.add(new 
PlcRequestContainer<>(CorrelatedPlcReadRequest.of(field, tdpu), 
correlatedCompletableFuture), subPromise);
+                        PlcRequestContainer<CorrelatedPlcReadRequest, 
InternalPlcResponse> correlatedPlcRequestContainer = new 
PlcRequestContainer<>(CorrelatedPlcReadRequest.of(field, tdpu), 
correlatedCompletableFuture);
+                        correlationToParentContainer.put(tdpu, in);
+                        queue.add(correlatedPlcRequestContainer, subPromise);
                         if (!tdpus.add(tdpu)) {
                             throw new IllegalStateException("AtomicInteger 
should not create duplicated ids: " + tdpu);
                         }
@@ -175,17 +206,19 @@ public class SingleItemToSingleRequestProtocol extends 
ChannelDuplexHandler {
                     
internalPlcWriteRequest.getNamedFieldTriples().forEach(fieldItemTriple -> {
                         ChannelPromise subPromise = new 
DefaultChannelPromise(promise.channel());
 
-                        int tdpu = correlationId.getAndIncrement();
+                        Integer tdpu = correlationId.getAndIncrement();
                         CompletableFuture<InternalPlcResponse> 
correlatedCompletableFuture = new CompletableFuture<>()
                             .thenApply(InternalPlcResponse.class::cast)
                             .whenComplete((internalPlcResponse, throwable) -> {
                                 if (throwable != null) {
-                                    errored(tdpu, throwable);
+                                    errored(tdpu, throwable, 
in.getResponseFuture());
                                 } else {
-                                    tryFinish(tdpu, internalPlcResponse);
+                                    tryFinish(tdpu, internalPlcResponse, 
in.getResponseFuture());
                                 }
                             });
-                        queue.add(new 
PlcRequestContainer<>(CorrelatedPlcWriteRequest.of(fieldItemTriple, tdpu), 
correlatedCompletableFuture), subPromise);
+                        PlcRequestContainer<CorrelatedPlcWriteRequest, 
InternalPlcResponse> correlatedPlcRequestContainer = new 
PlcRequestContainer<>(CorrelatedPlcWriteRequest.of(fieldItemTriple, tdpu), 
correlatedCompletableFuture);
+                        correlationToParentContainer.put(tdpu, in);
+                        queue.add(correlatedPlcRequestContainer, subPromise);
                         if (!tdpus.add(tdpu)) {
                             throw new IllegalStateException("AtomicInteger 
should not create duplicated ids: " + tdpu);
                         }
@@ -232,30 +265,30 @@ public class SingleItemToSingleRequestProtocol extends 
ChannelDuplexHandler {
             if (request instanceof CorrelatedPlcRequest) {
                 CorrelatedPlcRequest correlatedPlcRequest = 
(CorrelatedPlcRequest) request;
 
-                // Add it to the list of sentButUnacknowledgedRequestItems.
-                
sentButUnacknowledgedRequestItems.put(correlatedPlcRequest.getTdpu(), 
currentItem);
+                // Add it to the list of sentButUnacknowledgedSubContainer.
+                
sentButUnacknowledgedSubContainer.put(correlatedPlcRequest.getTdpu(), 
currentItem);
 
-                LOGGER.debug("Item Message with id {} sent", 
correlatedPlcRequest.getTdpu());
+                LOGGER.debug("container with id {} sent: ", 
correlatedPlcRequest.getTdpu(), currentItem);
             }
         }
         ctx.flush();
     }
 
-    interface CorrelatedPlcRequest extends InternalPlcRequest {
+    protected interface CorrelatedPlcRequest extends InternalPlcRequest {
 
         int getTdpu();
     }
 
-    private static class CorrelatedPlcReadRequest extends 
DefaultPlcReadRequest implements CorrelatedPlcRequest {
+    protected static class CorrelatedPlcReadRequest extends 
DefaultPlcReadRequest implements CorrelatedPlcRequest {
 
-        private final int tdpu;
+        protected final int tdpu;
 
-        public CorrelatedPlcReadRequest(LinkedHashMap<String, PlcField> 
fields, int tdpu) {
+        protected CorrelatedPlcReadRequest(LinkedHashMap<String, PlcField> 
fields, int tdpu) {
             super(fields);
             this.tdpu = tdpu;
         }
 
-        public static CorrelatedPlcReadRequest of(Pair<String, PlcField> 
stringPlcFieldPair, int tdpu) {
+        protected static CorrelatedPlcReadRequest of(Pair<String, PlcField> 
stringPlcFieldPair, int tdpu) {
             LinkedHashMap<String, PlcField> fields = new LinkedHashMap<>();
             fields.put(stringPlcFieldPair.getKey(), 
stringPlcFieldPair.getValue());
             return new CorrelatedPlcReadRequest(fields, tdpu);
@@ -267,7 +300,7 @@ public class SingleItemToSingleRequestProtocol extends 
ChannelDuplexHandler {
         }
     }
 
-    private static class CorrelatedPlcWriteRequest extends 
DefaultPlcWriteRequest implements CorrelatedPlcRequest {
+    protected static class CorrelatedPlcWriteRequest extends 
DefaultPlcWriteRequest implements CorrelatedPlcRequest {
 
         private final int tdpu;
 
@@ -287,4 +320,16 @@ public class SingleItemToSingleRequestProtocol extends 
ChannelDuplexHandler {
             return tdpu;
         }
     }
+
+    // TODO: maybe export to jmx
+    public Map<String, Integer> getStatistics() {
+        HashMap<String, Integer> statistics = new HashMap<>();
+        statistics.put("queue", queue.size());
+        statistics.put("sentButUnacknowledgedSubContainer", 
sentButUnacknowledgedSubContainer.size());
+        statistics.put("correlationToParentContainer", 
correlationToParentContainer.size());
+        statistics.put("containerCorrelationIdMap", 
containerCorrelationIdMap.size());
+        statistics.put("responsesToBeDelivered", 
responsesToBeDelivered.size());
+        statistics.put("currentCorrelationId", correlationId.get());
+        return statistics;
+    }
 }
diff --git 
a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
 
b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
new file mode 100644
index 0000000..9a1c4ea
--- /dev/null
+++ 
b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
@@ -0,0 +1,290 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+
+package org.apache.plc4x.java.base.protocol;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.plc4x.java.api.messages.PlcFieldRequest;
+import org.apache.plc4x.java.api.model.PlcField;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.base.messages.*;
+import org.apache.plc4x.java.base.messages.items.FieldItem;
+import org.assertj.core.api.WithAssertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.*;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+@ExtendWith(MockitoExtension.class)
+class SingleItemToSingleRequestProtocolTest implements WithAssertions {
+
+    @InjectMocks
+    SingleItemToSingleRequestProtocol SUT;
+
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    ChannelHandlerContext channelHandlerContext;
+
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    ChannelPromise channelPromise;
+
+    @Mock
+    CompletableFuture<InternalPlcResponse> responseCompletableFuture;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        SUT.channelRegistered(channelHandlerContext);
+        when(channelHandlerContext.executor().inEventLoop()).thenReturn(true);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        SUT.channelUnregistered(channelHandlerContext);
+    }
+
+    @Nested
+    class Misc {
+        @Test
+        void channelRegistered() throws Exception {
+            SUT.channelRegistered(channelHandlerContext);
+            assertThat(SUT.getStatistics()).containsOnly(
+                entry("queue", 0),
+                entry("sentButUnacknowledgedSubContainer", 0),
+                entry("correlationToParentContainer", 0),
+                entry("containerCorrelationIdMap", 0),
+                entry("responsesToBeDelivered", 0),
+                entry("currentCorrelationId", 0)
+            );
+        }
+
+        @Test
+        void channelUnregistered() throws Exception {
+            SUT.channelUnregistered(channelHandlerContext);
+            assertThat(SUT.getStatistics()).containsOnly(
+                entry("queue", 0),
+                entry("sentButUnacknowledgedSubContainer", 0),
+                entry("correlationToParentContainer", 0),
+                entry("containerCorrelationIdMap", 0),
+                entry("responsesToBeDelivered", 0),
+                entry("currentCorrelationId", 0)
+            );
+        }
+
+        @Test
+        void channelInactive() throws Exception {
+            SUT.channelInactive(channelHandlerContext);
+            assertThat(SUT.getStatistics()).containsOnly(
+                entry("queue", 0),
+                entry("sentButUnacknowledgedSubContainer", 0),
+                entry("correlationToParentContainer", 0),
+                entry("containerCorrelationIdMap", 0),
+                entry("responsesToBeDelivered", 0),
+                entry("currentCorrelationId", 0)
+            );
+        }
+    }
+
+    @Nested
+    class Roundtrip {
+        @Captor
+        ArgumentCaptor<PlcRequestContainer> plcRequestContainerArgumentCaptor;
+
+        @Test
+        void simpleRead() throws Exception {
+            // Given
+            // we have a simple read
+            PlcRequestContainer<TestDefaultPlcReadRequest, 
InternalPlcResponse> msg = new 
PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), 
responseCompletableFuture);
+            // When
+            // we write this
+            SUT.write(channelHandlerContext, msg, channelPromise);
+            // And
+            // and we simulate that all get responded
+            verify(channelHandlerContext, 
times(5)).write(plcRequestContainerArgumentCaptor.capture(), any());
+            List<PlcRequestContainer> capturedDownstreamContainers = 
plcRequestContainerArgumentCaptor.getAllValues();
+            capturedDownstreamContainers.forEach(plcRequestContainer -> {
+                InternalPlcReadRequest request = (InternalPlcReadRequest) 
plcRequestContainer.getRequest();
+                String fieldName = request.getFieldNames().iterator().next();
+                CompletableFuture responseFuture = 
plcRequestContainer.getResponseFuture();
+                HashMap<String, Pair<PlcResponseCode, FieldItem>> 
responseFields = new HashMap<>();
+                responseFields.put(fieldName, Pair.of(PlcResponseCode.OK, 
mock(FieldItem.class)));
+                responseFuture.complete(new DefaultPlcReadResponse(request, 
responseFields));
+            });
+            // Then
+            // our complete container should complete normally
+            verify(responseCompletableFuture).complete(any());
+            // And we should have no memory leak
+            assertThat(SUT.getStatistics()).containsOnly(
+                entry("queue", 0),
+                entry("sentButUnacknowledgedSubContainer", 0),
+                entry("correlationToParentContainer", 0),
+                entry("containerCorrelationIdMap", 0),
+                entry("responsesToBeDelivered", 0),
+                entry("currentCorrelationId", 5)
+            );
+        }
+    }
+
+    @Nested
+    class Decoding {
+        @Test
+        void tryFinish() throws Exception {
+            SUT.tryFinish(1, null, new CompletableFuture<>());
+            // TODO: add Assertions.
+        }
+
+        @Test
+        void errored() throws Exception {
+            SUT.errored(1, mock(Throwable.class), new CompletableFuture<>());
+            // TODO: add Assertions.
+        }
+    }
+
+    @Nested
+    class Encoding {
+
+        @Captor
+        ArgumentCaptor<PlcRequestContainer> plcRequestContainerArgumentCaptor;
+
+        @Test
+        void empty() throws Exception {
+            // Given
+            Object msg = null;
+            // When
+            SUT.write(channelHandlerContext, msg, channelPromise);
+            // Then
+            verify(channelHandlerContext, times(1)).write(null, 
channelPromise);
+        }
+
+        @Test
+        void read() throws Exception {
+            // Given
+            PlcRequestContainer<TestDefaultPlcReadRequest, 
InternalPlcResponse> msg = new 
PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), 
responseCompletableFuture);
+            // When
+            SUT.write(channelHandlerContext, msg, channelPromise);
+            // Then
+            verify(channelHandlerContext, 
times(5)).write(plcRequestContainerArgumentCaptor.capture(), any());
+            List<PlcRequestContainer> capturedValues = 
plcRequestContainerArgumentCaptor.getAllValues();
+            // We check if every request as exactly one field
+            assertThat(capturedValues)
+                .allMatch(plcRequestContainer -> 
plcRequestContainer.getRequest() instanceof 
SingleItemToSingleRequestProtocol.CorrelatedPlcReadRequest)
+                .allMatch(plcRequestContainer -> 
((SingleItemToSingleRequestProtocol.CorrelatedPlcReadRequest) 
plcRequestContainer.getRequest()).getNumberOfFields() == 1);
+            // In sum we should see all fields
+            List<String> fieldNamesList = capturedValues.stream()
+                .map(PlcRequestContainer::getRequest)
+                .map(PlcFieldRequest.class::cast)
+                .map(PlcFieldRequest::getFieldNames)
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList());
+            // There should be no duplications
+            assertThat(fieldNamesList).hasSize(5);
+            assertThat(fieldNamesList).containsExactly(
+                "readField1",
+                "readField2",
+                "readField3",
+                "readField4",
+                "readField5"
+            );
+        }
+
+        @Test
+        void write() throws Exception {
+            // Given
+            PlcRequestContainer<TestDefaultPlcWriteRequest, 
InternalPlcResponse> msg = new 
PlcRequestContainer<>(TestDefaultPlcWriteRequest.build(), 
responseCompletableFuture);
+            // When
+            SUT.write(channelHandlerContext, msg, channelPromise);
+            // Then
+            verify(channelHandlerContext, 
times(5)).write(plcRequestContainerArgumentCaptor.capture(), any());
+            List<PlcRequestContainer> capturedValues = 
plcRequestContainerArgumentCaptor.getAllValues();
+            // We check if every request as exactly one field
+            assertThat(capturedValues)
+                .allMatch(plcRequestContainer -> 
plcRequestContainer.getRequest() instanceof 
SingleItemToSingleRequestProtocol.CorrelatedPlcWriteRequest)
+                .allMatch(plcRequestContainer -> 
((SingleItemToSingleRequestProtocol.CorrelatedPlcWriteRequest) 
plcRequestContainer.getRequest()).getNumberOfFields() == 1);
+            // In sum we should see all fields
+            List<String> fieldNamesList = capturedValues.stream()
+                .map(PlcRequestContainer::getRequest)
+                .map(PlcFieldRequest.class::cast)
+                .map(PlcFieldRequest::getFieldNames)
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList());
+            // There should be no duplications
+            assertThat(fieldNamesList).hasSize(5);
+            assertThat(fieldNamesList).containsExactly(
+                "writeField1",
+                "writeField2",
+                "writeField3",
+                "writeField4",
+                "writeField5"
+            );
+        }
+
+        @Test
+        void trySendingMessages() throws Exception {
+            SUT.trySendingMessages(channelHandlerContext);
+            // TODO: add assertions
+        }
+    }
+
+    private static class TestDefaultPlcReadRequest extends 
DefaultPlcReadRequest {
+
+        private TestDefaultPlcReadRequest(LinkedHashMap<String, PlcField> 
fields) {
+            super(fields);
+        }
+
+        private static TestDefaultPlcReadRequest build() {
+            LinkedHashMap<String, PlcField> fields = new LinkedHashMap<>();
+            fields.put("readField1", mock(PlcField.class));
+            fields.put("readField2", mock(PlcField.class));
+            fields.put("readField3", mock(PlcField.class));
+            fields.put("readField4", mock(PlcField.class));
+            fields.put("readField5", mock(PlcField.class));
+            return new TestDefaultPlcReadRequest(fields);
+        }
+    }
+
+    private static class TestDefaultPlcWriteRequest extends 
DefaultPlcWriteRequest {
+
+        private TestDefaultPlcWriteRequest(LinkedHashMap<String, 
Pair<PlcField, FieldItem>> fields) {
+            super(fields);
+        }
+
+        private static TestDefaultPlcWriteRequest build() {
+            LinkedHashMap<String, Pair<PlcField, FieldItem>> fields = new 
LinkedHashMap<>();
+            fields.put("writeField1", Pair.of(mock(PlcField.class), 
mock(FieldItem.class)));
+            fields.put("writeField2", Pair.of(mock(PlcField.class), 
mock(FieldItem.class)));
+            fields.put("writeField3", Pair.of(mock(PlcField.class), 
mock(FieldItem.class)));
+            fields.put("writeField4", Pair.of(mock(PlcField.class), 
mock(FieldItem.class)));
+            fields.put("writeField5", Pair.of(mock(PlcField.class), 
mock(FieldItem.class)));
+            return new TestDefaultPlcWriteRequest(fields);
+        }
+    }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index b4b2867..d1ad775 100644
--- a/pom.xml
+++ b/pom.xml
@@ -17,7 +17,8 @@
   limitations under the License.
 
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
 
   <modelVersion>4.0.0</modelVersion>
 
@@ -301,7 +302,7 @@
             </goals>
             <configuration>
               <rules>
-                <banVulnerable 
implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies" 
/>
+                <banVulnerable 
implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies"/>
               </rules>
             </configuration>
           </execution>
@@ -490,7 +491,9 @@
             </goals>
             <configuration>
               
<url>https://stackpath.bootstrapcdn.com/bootswatch/${bootstrap.version}/flatly/bootstrap.min.css</url>
-              
<outputDirectory>${project.build.directory}/dependency/META-INF/resources/webjars/bootstrap/${bootstrap.version}/css/</outputDirectory>
+              <outputDirectory>
+                
${project.build.directory}/dependency/META-INF/resources/webjars/bootstrap/${bootstrap.version}/css/
+              </outputDirectory>
               <outputFileName>bootstrap.min.css</outputFileName>
             </configuration>
           </execution>
@@ -501,7 +504,9 @@
               <goal>wget</goal>
             </goals>
             <configuration>
-              
<url>https://use.fontawesome.com/releases/v${fontawesome.version}/fontawesome-free-${fontawesome.version}-web.zip</url>
+              <url>
+                
https://use.fontawesome.com/releases/v${fontawesome.version}/fontawesome-free-${fontawesome.version}-web.zip
+              </url>
               <unpack>true</unpack>
               
<outputDirectory>${project.build.directory}/dependency/fontawesome</outputDirectory>
             </configuration>
@@ -523,14 +528,18 @@
               
<outputDirectory>${project.build.directory}/site</outputDirectory>
               <resources>
                 <resource>
-                  
<directory>${project.build.directory}/dependency/META-INF/resources/webjars/anchorjs/${anchorjs.version}</directory>
+                  <directory>
+                    
${project.build.directory}/dependency/META-INF/resources/webjars/anchorjs/${anchorjs.version}
+                  </directory>
                   <targetPath>${project.build.directory}/site/js</targetPath>
                   <includes>
                     <include>anchor.min.js</include>
                   </includes>
                 </resource>
                 <resource>
-                  
<directory>${project.build.directory}/dependency/META-INF/resources/webjars/bootstrap/${bootstrap.version}</directory>
+                  <directory>
+                    
${project.build.directory}/dependency/META-INF/resources/webjars/bootstrap/${bootstrap.version}
+                  </directory>
                   <includes>
                     <include>css/bootstrap.min.css</include>
                     <include>js/bootstrap.min.js</include>
@@ -538,17 +547,22 @@
                   </includes>
                 </resource>
                 <resource>
-                  
<directory>${project.build.directory}/dependency/fontawesome/fontawesome-free-${fontawesome.version}-web</directory>
+                  <directory>
+                    
${project.build.directory}/dependency/fontawesome/fontawesome-free-${fontawesome.version}-web
+                  </directory>
                   <includes>
                     <include>css/all.min.css</include>
                   </includes>
                 </resource>
                 <resource>
-                  
<directory>${project.build.directory}/dependency/fontawesome/fontawesome-free-${fontawesome.version}-web/webfonts</directory>
+                  <directory>
+                    
${project.build.directory}/dependency/fontawesome/fontawesome-free-${fontawesome.version}-web/webfonts
+                  </directory>
                   
<targetPath>${project.build.directory}/site/fonts</targetPath>
                 </resource>
                 <resource>
-                  
<directory>${project.build.directory}/dependency/META-INF/resources/webjars/jquery/${jquery.version}</directory>
+                  
<directory>${project.build.directory}/dependency/META-INF/resources/webjars/jquery/${jquery.version}
+                  </directory>
                   <targetPath>${project.build.directory}/site/js</targetPath>
                   <includes>
                     <include>jquery.min.js</include>
@@ -860,7 +874,7 @@
                     </goals>
                   </pluginExecutionFilter>
                   <action>
-                    <ignore />
+                    <ignore/>
                   </action>
                 </pluginExecution>
               </pluginExecutions>

Reply via email to