This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch feature/TopLevelItemSpliting in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit 5951fc5e1ecc208363f653175c922b7ca2a3b53a Author: Sebastian Rühl <sru...@apache.org> AuthorDate: Thu Sep 27 12:13:30 2018 +0200 [General] SingleItemToSingleRequestProtocol fixed a bunch of bugs and added tests --- .../SingleItemToSingleRequestProtocol.java | 125 ++++++--- .../SingleItemToSingleRequestProtocolTest.java | 290 +++++++++++++++++++++ pom.xml | 34 ++- 3 files changed, 399 insertions(+), 50 deletions(-) diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java index fa578f2..724ffad 100644 --- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java +++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java @@ -36,29 +36,50 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; /** * This layer can be used to split a {@link org.apache.plc4x.java.api.messages.PlcRequest} which addresses multiple {@link PlcField}s into multiple subsequent {@link org.apache.plc4x.java.api.messages.PlcRequest}s. */ -// TODO: write test public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { public static final Logger LOGGER = LoggerFactory.getLogger(SingleItemToSingleRequestProtocol.class); private PendingWriteQueue queue; - private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>> sentButUnacknowledgedRequestItems; + // Map to track send subcontainers + private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>> sentButUnacknowledgedSubContainer; + // Map to map tdpu to original parent container + private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>> correlationToParentContainer; + + // Map to track tdpus per container private ConcurrentMap<PlcRequestContainer<?, ?>, Set<Integer>> containerCorrelationIdMap; + // Map to track a list of responses per parent container private ConcurrentMap<PlcRequestContainer<?, ?>, List<InternalPlcResponse<?>>> responsesToBeDelivered; private AtomicInteger correlationId; + public SingleItemToSingleRequestProtocol() { + this(true); + } + + public SingleItemToSingleRequestProtocol(boolean betterImplementationPossible) { + if (betterImplementationPossible) { + String callStack = Arrays.stream(Thread.currentThread().getStackTrace()) + .map(StackTraceElement::toString) + .collect(Collectors.joining("\n")); + LOGGER.warn("Unoptimized Usage of {} detected at:\n{}", this.getClass(), callStack); + LOGGER.info("Consider implementing item splitting native to the protocol."); + } + } + @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { this.queue = new PendingWriteQueue(ctx); - this.sentButUnacknowledgedRequestItems = new ConcurrentHashMap<>(); + this.sentButUnacknowledgedSubContainer = new ConcurrentHashMap<>(); + this.correlationToParentContainer = new ConcurrentHashMap<>(); this.containerCorrelationIdMap = new ConcurrentHashMap<>(); this.responsesToBeDelivered = new ConcurrentHashMap<>(); this.correlationId = new AtomicInteger(); @@ -68,6 +89,11 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { this.queue.removeAndWriteAll(); + this.sentButUnacknowledgedSubContainer.clear(); + this.correlationToParentContainer.clear(); + this.containerCorrelationIdMap.clear(); + this.responsesToBeDelivered.clear(); + this.correlationId.set(0); super.channelUnregistered(ctx); } @@ -82,20 +108,22 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { // Decoding //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - private void tryFinish(int correlationId, InternalPlcResponse msg) { - PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> plcRequestContainer = sentButUnacknowledgedRequestItems.remove(correlationId); - if (plcRequestContainer == null) { + protected void tryFinish(Integer correlationId, InternalPlcResponse msg, CompletableFuture<InternalPlcResponse<?>> originalResponseFuture) { + PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> subPlcRequestContainer = sentButUnacknowledgedSubContainer.remove(correlationId); + LOGGER.info("{} got acknowledged", subPlcRequestContainer); + PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> originalPlcRequestContainer = correlationToParentContainer.remove(correlationId); + if (originalPlcRequestContainer == null) { LOGGER.warn("Unrelated package received {}", msg); return; } - List<InternalPlcResponse<?>> correlatedResponseItems = responsesToBeDelivered.computeIfAbsent(plcRequestContainer, ignore -> new LinkedList<>()); + List<InternalPlcResponse<?>> correlatedResponseItems = responsesToBeDelivered.computeIfAbsent(originalPlcRequestContainer, ignore -> new LinkedList<>()); correlatedResponseItems.add(msg); - Set<Integer> integers = containerCorrelationIdMap.get(plcRequestContainer); + Set<Integer> integers = containerCorrelationIdMap.get(originalPlcRequestContainer); integers.remove(correlationId); if (integers.isEmpty()) { InternalPlcResponse<?> plcResponse; - if (plcRequestContainer.getRequest() instanceof InternalPlcReadRequest) { - InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) plcRequestContainer.getRequest(); + if (originalPlcRequestContainer.getRequest() instanceof InternalPlcReadRequest) { + InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) originalPlcRequestContainer.getRequest(); HashMap<String, Pair<PlcResponseCode, FieldItem>> fields = new HashMap<>(); correlatedResponseItems.stream() @@ -104,8 +132,8 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { .forEach(stringPairMap -> stringPairMap.forEach(fields::put)); plcResponse = new DefaultPlcReadResponse(internalPlcReadRequest, fields); - } else if (plcRequestContainer.getRequest() instanceof InternalPlcWriteRequest) { - InternalPlcWriteRequest internalPlcWriteRequest = (InternalPlcWriteRequest) plcRequestContainer.getRequest(); + } else if (originalPlcRequestContainer.getRequest() instanceof InternalPlcWriteRequest) { + InternalPlcWriteRequest internalPlcWriteRequest = (InternalPlcWriteRequest) originalPlcRequestContainer.getRequest(); HashMap<String, PlcResponseCode> values = new HashMap<>(); correlatedResponseItems.stream() @@ -115,20 +143,19 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { plcResponse = new DefaultPlcWriteResponse(internalPlcWriteRequest, values); } else { - throw new PlcRuntimeException("Unknown type detected " + plcRequestContainer.getRequest()); + throw new PlcRuntimeException("Unknown type detected " + originalPlcRequestContainer.getRequest()); } - plcRequestContainer.getResponseFuture().complete(plcResponse); - responsesToBeDelivered.remove(plcRequestContainer); + responsesToBeDelivered.remove(originalPlcRequestContainer); + containerCorrelationIdMap.remove(originalPlcRequestContainer); + originalResponseFuture.complete(plcResponse); } } - private void errored(int correlationId, Throwable throwable) { - PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> plcRequestContainer = sentButUnacknowledgedRequestItems.remove(correlationId); - if (plcRequestContainer == null) { - LOGGER.warn("Unrelated error received ", throwable); - return; - } - plcRequestContainer.getResponseFuture().completeExceptionally(throwable); + protected void errored(int correlationId, Throwable throwable, CompletableFuture<InternalPlcResponse<?>> originalResponseFuture) { + PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> plcRequestContainer = sentButUnacknowledgedSubContainer.remove(correlationId); + // TODO: cleanup missing maps as the complete response gets canceled now. + LOGGER.warn("PlcRequestContainer {} and correlationId {} failed ", plcRequestContainer, correlationId, throwable); + originalResponseFuture.completeExceptionally(throwable); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -153,17 +180,21 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { internalPlcReadRequest.getNamedFields().forEach(field -> { ChannelPromise subPromise = new DefaultChannelPromise(promise.channel()); - int tdpu = correlationId.getAndIncrement(); - CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>() + Integer tdpu = correlationId.getAndIncrement(); + CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>(); + // Important: don't chain to above as we want the above to be completed not the result of when complete + correlatedCompletableFuture .thenApply(InternalPlcResponse.class::cast) .whenComplete((internalPlcResponse, throwable) -> { if (throwable != null) { - errored(tdpu, throwable); + errored(tdpu, throwable, in.getResponseFuture()); } else { - tryFinish(tdpu, internalPlcResponse); + tryFinish(tdpu, internalPlcResponse, in.getResponseFuture()); } }); - queue.add(new PlcRequestContainer<>(CorrelatedPlcReadRequest.of(field, tdpu), correlatedCompletableFuture), subPromise); + PlcRequestContainer<CorrelatedPlcReadRequest, InternalPlcResponse> correlatedPlcRequestContainer = new PlcRequestContainer<>(CorrelatedPlcReadRequest.of(field, tdpu), correlatedCompletableFuture); + correlationToParentContainer.put(tdpu, in); + queue.add(correlatedPlcRequestContainer, subPromise); if (!tdpus.add(tdpu)) { throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu); } @@ -175,17 +206,19 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { internalPlcWriteRequest.getNamedFieldTriples().forEach(fieldItemTriple -> { ChannelPromise subPromise = new DefaultChannelPromise(promise.channel()); - int tdpu = correlationId.getAndIncrement(); + Integer tdpu = correlationId.getAndIncrement(); CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>() .thenApply(InternalPlcResponse.class::cast) .whenComplete((internalPlcResponse, throwable) -> { if (throwable != null) { - errored(tdpu, throwable); + errored(tdpu, throwable, in.getResponseFuture()); } else { - tryFinish(tdpu, internalPlcResponse); + tryFinish(tdpu, internalPlcResponse, in.getResponseFuture()); } }); - queue.add(new PlcRequestContainer<>(CorrelatedPlcWriteRequest.of(fieldItemTriple, tdpu), correlatedCompletableFuture), subPromise); + PlcRequestContainer<CorrelatedPlcWriteRequest, InternalPlcResponse> correlatedPlcRequestContainer = new PlcRequestContainer<>(CorrelatedPlcWriteRequest.of(fieldItemTriple, tdpu), correlatedCompletableFuture); + correlationToParentContainer.put(tdpu, in); + queue.add(correlatedPlcRequestContainer, subPromise); if (!tdpus.add(tdpu)) { throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu); } @@ -232,30 +265,30 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { if (request instanceof CorrelatedPlcRequest) { CorrelatedPlcRequest correlatedPlcRequest = (CorrelatedPlcRequest) request; - // Add it to the list of sentButUnacknowledgedRequestItems. - sentButUnacknowledgedRequestItems.put(correlatedPlcRequest.getTdpu(), currentItem); + // Add it to the list of sentButUnacknowledgedSubContainer. + sentButUnacknowledgedSubContainer.put(correlatedPlcRequest.getTdpu(), currentItem); - LOGGER.debug("Item Message with id {} sent", correlatedPlcRequest.getTdpu()); + LOGGER.debug("container with id {} sent: ", correlatedPlcRequest.getTdpu(), currentItem); } } ctx.flush(); } - interface CorrelatedPlcRequest extends InternalPlcRequest { + protected interface CorrelatedPlcRequest extends InternalPlcRequest { int getTdpu(); } - private static class CorrelatedPlcReadRequest extends DefaultPlcReadRequest implements CorrelatedPlcRequest { + protected static class CorrelatedPlcReadRequest extends DefaultPlcReadRequest implements CorrelatedPlcRequest { - private final int tdpu; + protected final int tdpu; - public CorrelatedPlcReadRequest(LinkedHashMap<String, PlcField> fields, int tdpu) { + protected CorrelatedPlcReadRequest(LinkedHashMap<String, PlcField> fields, int tdpu) { super(fields); this.tdpu = tdpu; } - public static CorrelatedPlcReadRequest of(Pair<String, PlcField> stringPlcFieldPair, int tdpu) { + protected static CorrelatedPlcReadRequest of(Pair<String, PlcField> stringPlcFieldPair, int tdpu) { LinkedHashMap<String, PlcField> fields = new LinkedHashMap<>(); fields.put(stringPlcFieldPair.getKey(), stringPlcFieldPair.getValue()); return new CorrelatedPlcReadRequest(fields, tdpu); @@ -267,7 +300,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { } } - private static class CorrelatedPlcWriteRequest extends DefaultPlcWriteRequest implements CorrelatedPlcRequest { + protected static class CorrelatedPlcWriteRequest extends DefaultPlcWriteRequest implements CorrelatedPlcRequest { private final int tdpu; @@ -287,4 +320,16 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { return tdpu; } } + + // TODO: maybe export to jmx + public Map<String, Integer> getStatistics() { + HashMap<String, Integer> statistics = new HashMap<>(); + statistics.put("queue", queue.size()); + statistics.put("sentButUnacknowledgedSubContainer", sentButUnacknowledgedSubContainer.size()); + statistics.put("correlationToParentContainer", correlationToParentContainer.size()); + statistics.put("containerCorrelationIdMap", containerCorrelationIdMap.size()); + statistics.put("responsesToBeDelivered", responsesToBeDelivered.size()); + statistics.put("currentCorrelationId", correlationId.get()); + return statistics; + } } diff --git a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java new file mode 100644 index 0000000..9a1c4ea --- /dev/null +++ b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java @@ -0,0 +1,290 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + */ + +package org.apache.plc4x.java.base.protocol; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.plc4x.java.api.messages.PlcFieldRequest; +import org.apache.plc4x.java.api.model.PlcField; +import org.apache.plc4x.java.api.types.PlcResponseCode; +import org.apache.plc4x.java.base.messages.*; +import org.apache.plc4x.java.base.messages.items.FieldItem; +import org.assertj.core.api.WithAssertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.*; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +class SingleItemToSingleRequestProtocolTest implements WithAssertions { + + @InjectMocks + SingleItemToSingleRequestProtocol SUT; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + ChannelHandlerContext channelHandlerContext; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + ChannelPromise channelPromise; + + @Mock + CompletableFuture<InternalPlcResponse> responseCompletableFuture; + + @BeforeEach + void setUp() throws Exception { + SUT.channelRegistered(channelHandlerContext); + when(channelHandlerContext.executor().inEventLoop()).thenReturn(true); + } + + @AfterEach + void tearDown() throws Exception { + SUT.channelUnregistered(channelHandlerContext); + } + + @Nested + class Misc { + @Test + void channelRegistered() throws Exception { + SUT.channelRegistered(channelHandlerContext); + assertThat(SUT.getStatistics()).containsOnly( + entry("queue", 0), + entry("sentButUnacknowledgedSubContainer", 0), + entry("correlationToParentContainer", 0), + entry("containerCorrelationIdMap", 0), + entry("responsesToBeDelivered", 0), + entry("currentCorrelationId", 0) + ); + } + + @Test + void channelUnregistered() throws Exception { + SUT.channelUnregistered(channelHandlerContext); + assertThat(SUT.getStatistics()).containsOnly( + entry("queue", 0), + entry("sentButUnacknowledgedSubContainer", 0), + entry("correlationToParentContainer", 0), + entry("containerCorrelationIdMap", 0), + entry("responsesToBeDelivered", 0), + entry("currentCorrelationId", 0) + ); + } + + @Test + void channelInactive() throws Exception { + SUT.channelInactive(channelHandlerContext); + assertThat(SUT.getStatistics()).containsOnly( + entry("queue", 0), + entry("sentButUnacknowledgedSubContainer", 0), + entry("correlationToParentContainer", 0), + entry("containerCorrelationIdMap", 0), + entry("responsesToBeDelivered", 0), + entry("currentCorrelationId", 0) + ); + } + } + + @Nested + class Roundtrip { + @Captor + ArgumentCaptor<PlcRequestContainer> plcRequestContainerArgumentCaptor; + + @Test + void simpleRead() throws Exception { + // Given + // we have a simple read + PlcRequestContainer<TestDefaultPlcReadRequest, InternalPlcResponse> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), responseCompletableFuture); + // When + // we write this + SUT.write(channelHandlerContext, msg, channelPromise); + // And + // and we simulate that all get responded + verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any()); + List<PlcRequestContainer> capturedDownstreamContainers = plcRequestContainerArgumentCaptor.getAllValues(); + capturedDownstreamContainers.forEach(plcRequestContainer -> { + InternalPlcReadRequest request = (InternalPlcReadRequest) plcRequestContainer.getRequest(); + String fieldName = request.getFieldNames().iterator().next(); + CompletableFuture responseFuture = plcRequestContainer.getResponseFuture(); + HashMap<String, Pair<PlcResponseCode, FieldItem>> responseFields = new HashMap<>(); + responseFields.put(fieldName, Pair.of(PlcResponseCode.OK, mock(FieldItem.class))); + responseFuture.complete(new DefaultPlcReadResponse(request, responseFields)); + }); + // Then + // our complete container should complete normally + verify(responseCompletableFuture).complete(any()); + // And we should have no memory leak + assertThat(SUT.getStatistics()).containsOnly( + entry("queue", 0), + entry("sentButUnacknowledgedSubContainer", 0), + entry("correlationToParentContainer", 0), + entry("containerCorrelationIdMap", 0), + entry("responsesToBeDelivered", 0), + entry("currentCorrelationId", 5) + ); + } + } + + @Nested + class Decoding { + @Test + void tryFinish() throws Exception { + SUT.tryFinish(1, null, new CompletableFuture<>()); + // TODO: add Assertions. + } + + @Test + void errored() throws Exception { + SUT.errored(1, mock(Throwable.class), new CompletableFuture<>()); + // TODO: add Assertions. + } + } + + @Nested + class Encoding { + + @Captor + ArgumentCaptor<PlcRequestContainer> plcRequestContainerArgumentCaptor; + + @Test + void empty() throws Exception { + // Given + Object msg = null; + // When + SUT.write(channelHandlerContext, msg, channelPromise); + // Then + verify(channelHandlerContext, times(1)).write(null, channelPromise); + } + + @Test + void read() throws Exception { + // Given + PlcRequestContainer<TestDefaultPlcReadRequest, InternalPlcResponse> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), responseCompletableFuture); + // When + SUT.write(channelHandlerContext, msg, channelPromise); + // Then + verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any()); + List<PlcRequestContainer> capturedValues = plcRequestContainerArgumentCaptor.getAllValues(); + // We check if every request as exactly one field + assertThat(capturedValues) + .allMatch(plcRequestContainer -> plcRequestContainer.getRequest() instanceof SingleItemToSingleRequestProtocol.CorrelatedPlcReadRequest) + .allMatch(plcRequestContainer -> ((SingleItemToSingleRequestProtocol.CorrelatedPlcReadRequest) plcRequestContainer.getRequest()).getNumberOfFields() == 1); + // In sum we should see all fields + List<String> fieldNamesList = capturedValues.stream() + .map(PlcRequestContainer::getRequest) + .map(PlcFieldRequest.class::cast) + .map(PlcFieldRequest::getFieldNames) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + // There should be no duplications + assertThat(fieldNamesList).hasSize(5); + assertThat(fieldNamesList).containsExactly( + "readField1", + "readField2", + "readField3", + "readField4", + "readField5" + ); + } + + @Test + void write() throws Exception { + // Given + PlcRequestContainer<TestDefaultPlcWriteRequest, InternalPlcResponse> msg = new PlcRequestContainer<>(TestDefaultPlcWriteRequest.build(), responseCompletableFuture); + // When + SUT.write(channelHandlerContext, msg, channelPromise); + // Then + verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any()); + List<PlcRequestContainer> capturedValues = plcRequestContainerArgumentCaptor.getAllValues(); + // We check if every request as exactly one field + assertThat(capturedValues) + .allMatch(plcRequestContainer -> plcRequestContainer.getRequest() instanceof SingleItemToSingleRequestProtocol.CorrelatedPlcWriteRequest) + .allMatch(plcRequestContainer -> ((SingleItemToSingleRequestProtocol.CorrelatedPlcWriteRequest) plcRequestContainer.getRequest()).getNumberOfFields() == 1); + // In sum we should see all fields + List<String> fieldNamesList = capturedValues.stream() + .map(PlcRequestContainer::getRequest) + .map(PlcFieldRequest.class::cast) + .map(PlcFieldRequest::getFieldNames) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + // There should be no duplications + assertThat(fieldNamesList).hasSize(5); + assertThat(fieldNamesList).containsExactly( + "writeField1", + "writeField2", + "writeField3", + "writeField4", + "writeField5" + ); + } + + @Test + void trySendingMessages() throws Exception { + SUT.trySendingMessages(channelHandlerContext); + // TODO: add assertions + } + } + + private static class TestDefaultPlcReadRequest extends DefaultPlcReadRequest { + + private TestDefaultPlcReadRequest(LinkedHashMap<String, PlcField> fields) { + super(fields); + } + + private static TestDefaultPlcReadRequest build() { + LinkedHashMap<String, PlcField> fields = new LinkedHashMap<>(); + fields.put("readField1", mock(PlcField.class)); + fields.put("readField2", mock(PlcField.class)); + fields.put("readField3", mock(PlcField.class)); + fields.put("readField4", mock(PlcField.class)); + fields.put("readField5", mock(PlcField.class)); + return new TestDefaultPlcReadRequest(fields); + } + } + + private static class TestDefaultPlcWriteRequest extends DefaultPlcWriteRequest { + + private TestDefaultPlcWriteRequest(LinkedHashMap<String, Pair<PlcField, FieldItem>> fields) { + super(fields); + } + + private static TestDefaultPlcWriteRequest build() { + LinkedHashMap<String, Pair<PlcField, FieldItem>> fields = new LinkedHashMap<>(); + fields.put("writeField1", Pair.of(mock(PlcField.class), mock(FieldItem.class))); + fields.put("writeField2", Pair.of(mock(PlcField.class), mock(FieldItem.class))); + fields.put("writeField3", Pair.of(mock(PlcField.class), mock(FieldItem.class))); + fields.put("writeField4", Pair.of(mock(PlcField.class), mock(FieldItem.class))); + fields.put("writeField5", Pair.of(mock(PlcField.class), mock(FieldItem.class))); + return new TestDefaultPlcWriteRequest(fields); + } + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index b4b2867..d1ad775 100644 --- a/pom.xml +++ b/pom.xml @@ -17,7 +17,8 @@ limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> @@ -301,7 +302,7 @@ </goals> <configuration> <rules> - <banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies" /> + <banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies"/> </rules> </configuration> </execution> @@ -490,7 +491,9 @@ </goals> <configuration> <url>https://stackpath.bootstrapcdn.com/bootswatch/${bootstrap.version}/flatly/bootstrap.min.css</url> - <outputDirectory>${project.build.directory}/dependency/META-INF/resources/webjars/bootstrap/${bootstrap.version}/css/</outputDirectory> + <outputDirectory> + ${project.build.directory}/dependency/META-INF/resources/webjars/bootstrap/${bootstrap.version}/css/ + </outputDirectory> <outputFileName>bootstrap.min.css</outputFileName> </configuration> </execution> @@ -501,7 +504,9 @@ <goal>wget</goal> </goals> <configuration> - <url>https://use.fontawesome.com/releases/v${fontawesome.version}/fontawesome-free-${fontawesome.version}-web.zip</url> + <url> + https://use.fontawesome.com/releases/v${fontawesome.version}/fontawesome-free-${fontawesome.version}-web.zip + </url> <unpack>true</unpack> <outputDirectory>${project.build.directory}/dependency/fontawesome</outputDirectory> </configuration> @@ -523,14 +528,18 @@ <outputDirectory>${project.build.directory}/site</outputDirectory> <resources> <resource> - <directory>${project.build.directory}/dependency/META-INF/resources/webjars/anchorjs/${anchorjs.version}</directory> + <directory> + ${project.build.directory}/dependency/META-INF/resources/webjars/anchorjs/${anchorjs.version} + </directory> <targetPath>${project.build.directory}/site/js</targetPath> <includes> <include>anchor.min.js</include> </includes> </resource> <resource> - <directory>${project.build.directory}/dependency/META-INF/resources/webjars/bootstrap/${bootstrap.version}</directory> + <directory> + ${project.build.directory}/dependency/META-INF/resources/webjars/bootstrap/${bootstrap.version} + </directory> <includes> <include>css/bootstrap.min.css</include> <include>js/bootstrap.min.js</include> @@ -538,17 +547,22 @@ </includes> </resource> <resource> - <directory>${project.build.directory}/dependency/fontawesome/fontawesome-free-${fontawesome.version}-web</directory> + <directory> + ${project.build.directory}/dependency/fontawesome/fontawesome-free-${fontawesome.version}-web + </directory> <includes> <include>css/all.min.css</include> </includes> </resource> <resource> - <directory>${project.build.directory}/dependency/fontawesome/fontawesome-free-${fontawesome.version}-web/webfonts</directory> + <directory> + ${project.build.directory}/dependency/fontawesome/fontawesome-free-${fontawesome.version}-web/webfonts + </directory> <targetPath>${project.build.directory}/site/fonts</targetPath> </resource> <resource> - <directory>${project.build.directory}/dependency/META-INF/resources/webjars/jquery/${jquery.version}</directory> + <directory>${project.build.directory}/dependency/META-INF/resources/webjars/jquery/${jquery.version} + </directory> <targetPath>${project.build.directory}/site/js</targetPath> <includes> <include>jquery.min.js</include> @@ -860,7 +874,7 @@ </goals> </pluginExecutionFilter> <action> - <ignore /> + <ignore/> </action> </pluginExecution> </pluginExecutions>