This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
The following commit(s) were added to refs/heads/master by this push: new 441bcde [General] SingleItemToSingleRequestProtocol fixed potential memory leak by applying receive timeouts 441bcde is described below commit 441bcde36196a3dd61296dec513f81867ecf6dc6 Author: Sebastian Rühl <sru...@apache.org> AuthorDate: Fri Sep 28 10:38:34 2018 +0200 [General] SingleItemToSingleRequestProtocol fixed potential memory leak by applying receive timeouts --- .../java/api/exceptions/PlcTimeoutException.java | 46 ++++++++ .../SingleItemToSingleRequestProtocol.java | 121 +++++++++++++++++++-- .../SingleItemToSingleRequestProtocolTest.java | 80 +++++++++++--- 3 files changed, 222 insertions(+), 25 deletions(-) diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/exceptions/PlcTimeoutException.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/exceptions/PlcTimeoutException.java new file mode 100644 index 0000000..ddba534 --- /dev/null +++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/exceptions/PlcTimeoutException.java @@ -0,0 +1,46 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + */ + +package org.apache.plc4x.java.api.exceptions; + +import java.util.concurrent.TimeUnit; + +/** + * Can be thrown when something times out. + */ +public class PlcTimeoutException extends PlcRuntimeException { + private final long timeout; + + /** + * Indicates something timed out. + * + * @param timeout in nanoseconds. + */ + public PlcTimeoutException(long timeout) { + super("Timeout reached after " + TimeUnit.NANOSECONDS.toMillis(timeout) + "ms"); + this.timeout = timeout; + } + + /** + * @return the timeout in nanoseconds. + */ + public long getTimeout() { + return timeout; + } +} diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java index 724ffad..bdd1a7f 100644 --- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java +++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java @@ -19,11 +19,15 @@ package org.apache.plc4x.java.base.protocol; import io.netty.channel.*; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.Timer; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.PromiseCombiner; import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Triple; import org.apache.plc4x.java.api.exceptions.PlcRuntimeException; +import org.apache.plc4x.java.api.exceptions.PlcTimeoutException; import org.apache.plc4x.java.api.model.PlcField; import org.apache.plc4x.java.api.types.PlcResponseCode; import org.apache.plc4x.java.base.messages.*; @@ -35,7 +39,9 @@ import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; /** @@ -47,6 +53,13 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { private PendingWriteQueue queue; + private Timer timer; + + // TODO: maybe better get from map + private long defaultReceiveTimeout; + + private ConcurrentMap<PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>, Timeout> scheduledTimeouts; + // Map to track send subcontainers private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>> sentButUnacknowledgedSubContainer; @@ -59,13 +72,27 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { // Map to track a list of responses per parent container private ConcurrentMap<PlcRequestContainer<?, ?>, List<InternalPlcResponse<?>>> responsesToBeDelivered; - private AtomicInteger correlationId; + private AtomicInteger correlationIdGenerator; + + // TODO: maybe put in map per day or per hour + private AtomicLong deliveredContainers; + + private AtomicLong erroredContainers; + + private AtomicLong deliveredItems; + + private AtomicLong erroredItems; public SingleItemToSingleRequestProtocol() { this(true); } public SingleItemToSingleRequestProtocol(boolean betterImplementationPossible) { + this(TimeUnit.SECONDS.toMillis(30), betterImplementationPossible); + } + + public SingleItemToSingleRequestProtocol(long defaultReceiveTimeout, boolean betterImplementationPossible) { + this.defaultReceiveTimeout = defaultReceiveTimeout; if (betterImplementationPossible) { String callStack = Arrays.stream(Thread.currentThread().getStackTrace()) .map(StackTraceElement::toString) @@ -78,22 +105,34 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { this.queue = new PendingWriteQueue(ctx); + this.timer = new HashedWheelTimer(); + this.scheduledTimeouts = new ConcurrentHashMap<>(); this.sentButUnacknowledgedSubContainer = new ConcurrentHashMap<>(); this.correlationToParentContainer = new ConcurrentHashMap<>(); this.containerCorrelationIdMap = new ConcurrentHashMap<>(); this.responsesToBeDelivered = new ConcurrentHashMap<>(); - this.correlationId = new AtomicInteger(); + this.correlationIdGenerator = new AtomicInteger(); + this.deliveredItems = new AtomicLong(); + this.erroredItems = new AtomicLong(); + this.deliveredContainers = new AtomicLong(); + this.erroredContainers = new AtomicLong(); super.channelRegistered(ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { this.queue.removeAndWriteAll(); + this.timer.stop(); + this.scheduledTimeouts.clear(); this.sentButUnacknowledgedSubContainer.clear(); this.correlationToParentContainer.clear(); this.containerCorrelationIdMap.clear(); this.responsesToBeDelivered.clear(); - this.correlationId.set(0); + this.correlationIdGenerator.set(0); + this.deliveredItems.set(0); + this.erroredItems.set(0); + this.deliveredContainers.set(0); + this.erroredContainers.set(0); super.channelUnregistered(ctx); } @@ -101,6 +140,17 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { public void channelInactive(ChannelHandlerContext ctx) throws Exception { // Send everything so we get a proper failure for those pending writes this.queue.removeAndWriteAll(); + this.timer.stop(); + this.scheduledTimeouts.clear(); + this.sentButUnacknowledgedSubContainer.clear(); + this.correlationToParentContainer.clear(); + this.containerCorrelationIdMap.clear(); + this.responsesToBeDelivered.clear(); + this.correlationIdGenerator.set(0); + this.deliveredItems.set(0); + this.erroredItems.set(0); + this.deliveredContainers.set(0); + this.erroredContainers.set(0); super.channelInactive(ctx); } @@ -109,6 +159,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// protected void tryFinish(Integer correlationId, InternalPlcResponse msg, CompletableFuture<InternalPlcResponse<?>> originalResponseFuture) { + deliveredItems.incrementAndGet(); PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> subPlcRequestContainer = sentButUnacknowledgedSubContainer.remove(correlationId); LOGGER.info("{} got acknowledged", subPlcRequestContainer); PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> originalPlcRequestContainer = correlationToParentContainer.remove(correlationId); @@ -121,6 +172,12 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { Set<Integer> integers = containerCorrelationIdMap.get(originalPlcRequestContainer); integers.remove(correlationId); if (integers.isEmpty()) { + deliveredContainers.incrementAndGet(); + Timeout timeout = scheduledTimeouts.remove(originalPlcRequestContainer); + if (timeout != null) { + timeout.cancel(); + } + InternalPlcResponse<?> plcResponse; if (originalPlcRequestContainer.getRequest() instanceof InternalPlcReadRequest) { InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) originalPlcRequestContainer.getRequest(); @@ -152,10 +209,25 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { } protected void errored(int correlationId, Throwable throwable, CompletableFuture<InternalPlcResponse<?>> originalResponseFuture) { - PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> plcRequestContainer = sentButUnacknowledgedSubContainer.remove(correlationId); - // TODO: cleanup missing maps as the complete response gets canceled now. - LOGGER.warn("PlcRequestContainer {} and correlationId {} failed ", plcRequestContainer, correlationId, throwable); - originalResponseFuture.completeExceptionally(throwable); + erroredItems.incrementAndGet(); + PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> subPlcRequestContainer = sentButUnacknowledgedSubContainer.remove(correlationId); + LOGGER.info("{} got errored", subPlcRequestContainer); + + + PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> originalPlcRequestContainer = correlationToParentContainer.remove(correlationId); + if (originalPlcRequestContainer == null) { + LOGGER.warn("Unrelated error received correlationId:{}", correlationId, throwable); + } else { + erroredContainers.incrementAndGet(); + Timeout timeout = scheduledTimeouts.remove(originalPlcRequestContainer); + if (timeout != null) { + timeout.cancel(); + } + responsesToBeDelivered.remove(originalPlcRequestContainer); + containerCorrelationIdMap.remove(originalPlcRequestContainer); + LOGGER.warn("PlcRequestContainer {} and correlationId {} failed ", correlationToParentContainer, correlationId, throwable); + originalResponseFuture.completeExceptionally(throwable); + } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -169,6 +241,9 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> in = (PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>) msg; Set<Integer> tdpus = containerCorrelationIdMap.computeIfAbsent(in, plcRequestContainer -> new HashSet<>()); + Timeout timeout = timer.newTimeout(timeout_ -> handleTimeout(timeout_, in, tdpus, System.nanoTime()), defaultReceiveTimeout, TimeUnit.MILLISECONDS); + scheduledTimeouts.put(in, timeout); + // Create a promise that has to be called multiple times. PromiseCombiner promiseCombiner = new PromiseCombiner(); InternalPlcRequest request = in.getRequest(); @@ -180,7 +255,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { internalPlcReadRequest.getNamedFields().forEach(field -> { ChannelPromise subPromise = new DefaultChannelPromise(promise.channel()); - Integer tdpu = correlationId.getAndIncrement(); + Integer tdpu = correlationIdGenerator.getAndIncrement(); CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>(); // Important: don't chain to above as we want the above to be completed not the result of when complete correlatedCompletableFuture @@ -206,7 +281,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { internalPlcWriteRequest.getNamedFieldTriples().forEach(fieldItemTriple -> { ChannelPromise subPromise = new DefaultChannelPromise(promise.channel()); - Integer tdpu = correlationId.getAndIncrement(); + Integer tdpu = correlationIdGenerator.getAndIncrement(); CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>() .thenApply(InternalPlcResponse.class::cast) .whenComplete((internalPlcResponse, throwable) -> { @@ -244,6 +319,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { // Helpers //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + @SuppressWarnings("unchecked") protected synchronized void trySendingMessages(ChannelHandlerContext ctx) { while (queue.size() > 0) { // Get the RequestItem that is up next in the queue. @@ -274,6 +350,23 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { ctx.flush(); } + private void handleTimeout(Timeout timeout, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> in, Set<Integer> tdpus, long scheduledAt) { + if (timeout.isCancelled()) { + LOGGER.debug("container {} with timeout {} got canceled", in, timeout); + return; + } + LOGGER.warn("container {} timed out:{}", in, timeout); + erroredContainers.incrementAndGet(); + responsesToBeDelivered.remove(in); + containerCorrelationIdMap.remove(in); + tdpus.forEach(tdpu -> { + erroredItems.incrementAndGet(); + sentButUnacknowledgedSubContainer.remove(tdpu); + correlationToParentContainer.remove(tdpu); + }); + in.getResponseFuture().completeExceptionally(new PlcTimeoutException(System.nanoTime() - scheduledAt)); + } + protected interface CorrelatedPlcRequest extends InternalPlcRequest { int getTdpu(); @@ -322,14 +415,18 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { } // TODO: maybe export to jmx - public Map<String, Integer> getStatistics() { - HashMap<String, Integer> statistics = new HashMap<>(); + public Map<String, Number> getStatistics() { + HashMap<String, Number> statistics = new HashMap<>(); statistics.put("queue", queue.size()); statistics.put("sentButUnacknowledgedSubContainer", sentButUnacknowledgedSubContainer.size()); statistics.put("correlationToParentContainer", correlationToParentContainer.size()); statistics.put("containerCorrelationIdMap", containerCorrelationIdMap.size()); statistics.put("responsesToBeDelivered", responsesToBeDelivered.size()); - statistics.put("currentCorrelationId", correlationId.get()); + statistics.put("correlationIdGenerator", correlationIdGenerator.get()); + statistics.put("deliveredItems", deliveredItems.get()); + statistics.put("erroredItems", erroredItems.get()); + statistics.put("deliveredContainers", deliveredContainers.get()); + statistics.put("erroredContainers", erroredContainers.get()); return statistics; } } diff --git a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java index 9a1c4ea..1185ace 100644 --- a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java +++ b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java @@ -41,6 +41,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.mockito.ArgumentMatchers.any; @@ -50,7 +51,7 @@ import static org.mockito.Mockito.*; class SingleItemToSingleRequestProtocolTest implements WithAssertions { @InjectMocks - SingleItemToSingleRequestProtocol SUT; + SingleItemToSingleRequestProtocol SUT = new SingleItemToSingleRequestProtocol(TimeUnit.SECONDS.toMillis(1), false); @Mock(answer = Answers.RETURNS_DEEP_STUBS) ChannelHandlerContext channelHandlerContext; @@ -83,7 +84,11 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions { entry("correlationToParentContainer", 0), entry("containerCorrelationIdMap", 0), entry("responsesToBeDelivered", 0), - entry("currentCorrelationId", 0) + entry("correlationIdGenerator", 0), + entry("deliveredItems", 0L), + entry("erroredItems", 0L), + entry("erroredContainers", 0L), + entry("deliveredContainers", 0L) ); } @@ -96,7 +101,11 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions { entry("correlationToParentContainer", 0), entry("containerCorrelationIdMap", 0), entry("responsesToBeDelivered", 0), - entry("currentCorrelationId", 0) + entry("correlationIdGenerator", 0), + entry("deliveredItems", 0L), + entry("erroredItems", 0L), + entry("deliveredContainers", 0L), + entry("erroredContainers", 0L) ); } @@ -109,7 +118,11 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions { entry("correlationToParentContainer", 0), entry("containerCorrelationIdMap", 0), entry("responsesToBeDelivered", 0), - entry("currentCorrelationId", 0) + entry("correlationIdGenerator", 0), + entry("deliveredItems", 0L), + entry("erroredItems", 0L), + entry("deliveredContainers", 0L), + entry("erroredContainers", 0L) ); } } @@ -131,14 +144,7 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions { // and we simulate that all get responded verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any()); List<PlcRequestContainer> capturedDownstreamContainers = plcRequestContainerArgumentCaptor.getAllValues(); - capturedDownstreamContainers.forEach(plcRequestContainer -> { - InternalPlcReadRequest request = (InternalPlcReadRequest) plcRequestContainer.getRequest(); - String fieldName = request.getFieldNames().iterator().next(); - CompletableFuture responseFuture = plcRequestContainer.getResponseFuture(); - HashMap<String, Pair<PlcResponseCode, FieldItem>> responseFields = new HashMap<>(); - responseFields.put(fieldName, Pair.of(PlcResponseCode.OK, mock(FieldItem.class))); - responseFuture.complete(new DefaultPlcReadResponse(request, responseFields)); - }); + capturedDownstreamContainers.forEach(this::produceReadResponse); // Then // our complete container should complete normally verify(responseCompletableFuture).complete(any()); @@ -149,9 +155,57 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions { entry("correlationToParentContainer", 0), entry("containerCorrelationIdMap", 0), entry("responsesToBeDelivered", 0), - entry("currentCorrelationId", 5) + entry("correlationIdGenerator", 5), + entry("erroredItems", 0L), + entry("deliveredItems", 5L), + entry("deliveredContainers", 1L), + entry("erroredContainers", 0L) ); } + + @Test + void partialRead() throws Exception { + // Given + // we have a simple read + PlcRequestContainer<TestDefaultPlcReadRequest, InternalPlcResponse> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), responseCompletableFuture); + // When + // we write this + SUT.write(channelHandlerContext, msg, channelPromise); + // And + // and we simulate that some one responded + verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any()); + List<PlcRequestContainer> capturedDownstreamContainers = plcRequestContainerArgumentCaptor.getAllValues(); + capturedDownstreamContainers.stream().findFirst().map(this::produceReadResponse); + // Then + // We create SUT with 1 seconds timeout + TimeUnit.SECONDS.sleep(2); + // our complete container should complete normally + verify(responseCompletableFuture).completeExceptionally(any()); + // And we should have no memory leak + assertThat(SUT.getStatistics()).containsOnly( + entry("queue", 0), + entry("sentButUnacknowledgedSubContainer", 0), + entry("correlationToParentContainer", 0), + entry("containerCorrelationIdMap", 0), + entry("responsesToBeDelivered", 0), + entry("correlationIdGenerator", 5), + entry("deliveredItems", 1L), + entry("erroredItems", 4L), + entry("deliveredContainers", 0L), + entry("erroredContainers", 1L) + ); + } + + @SuppressWarnings("unchecked") + private Void produceReadResponse(PlcRequestContainer plcRequestContainer) { + InternalPlcReadRequest request = (InternalPlcReadRequest) plcRequestContainer.getRequest(); + String fieldName = request.getFieldNames().iterator().next(); + CompletableFuture responseFuture = plcRequestContainer.getResponseFuture(); + HashMap<String, Pair<PlcResponseCode, FieldItem>> responseFields = new HashMap<>(); + responseFields.put(fieldName, Pair.of(PlcResponseCode.OK, mock(FieldItem.class))); + responseFuture.complete(new DefaultPlcReadResponse(request, responseFields)); + return null; + } } @Nested