This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch feature/TopLevelItemSpliting in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit 10cf16a7c47d113d88f5d19ada479475442bd0fc Author: Sebastian Rühl <sru...@apache.org> AuthorDate: Thu Jul 5 13:20:43 2018 +0200 Introduced protocol layer to split requestItems to several requests. --- .../base/messages/item/CorrelatedRequestItem.java | 81 +++++++++ .../base/messages/item/CorrelatedResponseItem.java | 70 ++++++++ .../SingleItemToSingleRequestProtocol.java | 193 +++++++++++++++++++++ 3 files changed, 344 insertions(+) diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java new file mode 100644 index 0000000..5212618 --- /dev/null +++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java @@ -0,0 +1,81 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + */ +package org.apache.plc4x.java.base.messages.item; + +import org.apache.plc4x.java.api.messages.PlcRequestContainer; +import org.apache.plc4x.java.api.messages.PlcResponse; +import org.apache.plc4x.java.api.messages.items.RequestItem; + +import java.util.Objects; + +public class CorrelatedRequestItem<REQUEST_ITEM extends RequestItem<?>> { + + private final int correlationId; + + private final REQUEST_ITEM requestItem; + + private final PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer; + + public CorrelatedRequestItem(int correlationId, REQUEST_ITEM requestItem, PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer) { + this.correlationId = correlationId; + this.requestItem = requestItem; + this.plcRequestContainer = plcRequestContainer; + } + + public int getCorrelationId() { + return correlationId; + } + + public REQUEST_ITEM getRequestItem() { + return requestItem; + } + + public PlcRequestContainer<?, PlcResponse<?, ?, ?>> getPlcRequestContainer() { + return plcRequestContainer; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof CorrelatedRequestItem)) { + return false; + } + CorrelatedRequestItem<?> that = (CorrelatedRequestItem<?>) o; + return correlationId == that.correlationId && + Objects.equals(requestItem, that.requestItem) && + Objects.equals(plcRequestContainer, that.plcRequestContainer); + } + + @Override + public int hashCode() { + + return Objects.hash(correlationId, requestItem, plcRequestContainer); + } + + @Override + public String toString() { + return "CorrelatedRequestItem{" + + "correlationId=" + correlationId + + ", requestItem=" + requestItem + + ", plcRequestContainer=" + plcRequestContainer + + '}'; + } +} diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java new file mode 100644 index 0000000..38a9032 --- /dev/null +++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java @@ -0,0 +1,70 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + */ +package org.apache.plc4x.java.base.messages.item; + +import org.apache.plc4x.java.api.messages.items.ResponseItem; + +import java.util.Objects; + +public class CorrelatedResponseItem<RESPONSE_ITEM extends ResponseItem<?>> { + + private final int correlationId; + + private final RESPONSE_ITEM responseItem; + + public CorrelatedResponseItem(int correlationId, RESPONSE_ITEM responseItem) { + this.correlationId = correlationId; + this.responseItem = responseItem; + } + + public int getCorrelationId() { + return correlationId; + } + + public RESPONSE_ITEM getResponseItem() { + return responseItem; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof CorrelatedResponseItem)) { + return false; + } + CorrelatedResponseItem<?> that = (CorrelatedResponseItem<?>) o; + return correlationId == that.correlationId && + Objects.equals(responseItem, that.responseItem); + } + + @Override + public int hashCode() { + + return Objects.hash(correlationId, responseItem); + } + + @Override + public String toString() { + return "CorrelatedResponseItem{" + + "correlationId=" + correlationId + + ", responseItem=" + responseItem + + '}'; + } +} diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java new file mode 100644 index 0000000..8c0e272 --- /dev/null +++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java @@ -0,0 +1,193 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + */ +package org.apache.plc4x.java.base.protocol; + +import io.netty.channel.*; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.PromiseCombiner; +import org.apache.plc4x.java.api.exceptions.PlcProtocolException; +import org.apache.plc4x.java.api.messages.*; +import org.apache.plc4x.java.api.messages.items.RequestItem; +import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadRequest; +import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadResponse; +import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteRequest; +import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteResponse; +import org.apache.plc4x.java.base.messages.item.CorrelatedRequestItem; +import org.apache.plc4x.java.base.messages.item.CorrelatedResponseItem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { + + public static final Logger LOGGER = LoggerFactory.getLogger(SingleItemToSingleRequestProtocol.class); + + private PendingWriteQueue queue; + + private ConcurrentMap<Integer, CorrelatedRequestItem<?>> sentButUnacknowledgedRequestItems; + + private ConcurrentMap<PlcRequestContainer<?, ?>, Set<Integer>> containerCorrelationIdMap; + + private ConcurrentMap<PlcRequestContainer<?, ?>, List<CorrelatedResponseItem<?>>> responsesToBeDevliered; + + private AtomicInteger correlationId; + + private final MessageToMessageDecoder<CorrelatedResponseItem> decoder = new MessageToMessageDecoder<CorrelatedResponseItem>() { + + @Override + protected void decode(ChannelHandlerContext ctx, CorrelatedResponseItem msg, List<Object> out) throws Exception { + SingleItemToSingleRequestProtocol.this.decode(ctx, msg, out); + } + }; + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + this.queue = new PendingWriteQueue(ctx); + this.sentButUnacknowledgedRequestItems = new ConcurrentHashMap<>(); + this.containerCorrelationIdMap = new ConcurrentHashMap<>(); + this.correlationId = new AtomicInteger(); + super.channelRegistered(ctx); + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + this.queue.removeAndWriteAll(); + super.channelUnregistered(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + // Send everything so we get a proper failure for those pending writes + this.queue.removeAndWriteAll(); + super.channelInactive(ctx); + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Decoding + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + decoder.channelRead(ctx, msg); + super.read(ctx); + } + + private void decode(ChannelHandlerContext ctx, CorrelatedResponseItem<?> msg, List<Object> out) throws PlcProtocolException { + int correlationId = msg.getCorrelationId(); + CorrelatedRequestItem<?> correlatedRequestItem = sentButUnacknowledgedRequestItems.remove(correlationId); + if (correlatedRequestItem == null) { + throw new PlcProtocolException("Unrelated package received " + msg); + } + PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer = correlatedRequestItem.getPlcRequestContainer(); + List<CorrelatedResponseItem<?>> correlatedResponseItems = responsesToBeDevliered.computeIfAbsent(plcRequestContainer, ignore -> new LinkedList<>()); + correlatedResponseItems.add(msg); + Set<Integer> integers = containerCorrelationIdMap.get(plcRequestContainer); + integers.remove(correlationId); + if (integers.isEmpty()) { + PlcResponse<?, ?, ?> plcResponse; + if (plcRequestContainer.getRequest() instanceof TypeSafePlcReadRequest) { + TypeSafePlcReadRequest typeSafePlcReadRequest = (TypeSafePlcReadRequest) plcRequestContainer.getRequest(); + plcResponse = new TypeSafePlcReadResponse((TypeSafePlcReadRequest<?>) plcRequestContainer.getRequest(), correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList())); + } else if (plcRequestContainer.getRequest() instanceof TypeSafePlcWriteRequest) { + plcResponse = new TypeSafePlcWriteResponse((TypeSafePlcWriteRequest<?>) plcRequestContainer.getRequest(), correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList())); + } else if (plcRequestContainer.getRequest() instanceof PlcReadRequest) { + plcResponse = new PlcReadResponse((PlcReadRequest) plcRequestContainer.getRequest(), (List) correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList())); + } else if (plcRequestContainer.getRequest() instanceof PlcWriteRequest) { + plcResponse = new PlcWriteResponse((PlcWriteRequest) plcRequestContainer.getRequest(), (List) correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList())); + } else { + throw new PlcProtocolException("Unknown type detected " + plcRequestContainer.getRequest()); + } + plcRequestContainer.getResponseFuture().complete(plcResponse); + responsesToBeDevliered.remove(plcRequestContainer); + } + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Encoding + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (msg instanceof PlcRequestContainer) { + PlcRequestContainer<?, PlcResponse<?, ?, ?>> in = (PlcRequestContainer<?, PlcResponse<?, ?, ?>>) msg; + Set<Integer> tdpus = containerCorrelationIdMap.computeIfAbsent(in, plcRequestContainer -> new HashSet<>()); + + // Create a promise that has to be called multiple times. + PromiseCombiner promiseCombiner = new PromiseCombiner(); + PlcRequest<?> request = in.getRequest(); + for (RequestItem<?> item : request.getRequestItems()) { + ChannelPromise subPromise = new DefaultChannelPromise(promise.channel()); + + int tdpu = correlationId.getAndIncrement(); + queue.add(new CorrelatedRequestItem<>(tdpu, item, in), subPromise); + if (!tdpus.add(tdpu)) { + throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu); + } + promiseCombiner.add((Future) subPromise); + } + promiseCombiner.finish(promise); + + // Start sending the queue content. + trySendingMessages(ctx); + } else { + super.write(ctx, msg, promise); + } + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Helpers + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + protected synchronized void trySendingMessages(ChannelHandlerContext ctx) { + while (queue.size() > 0) { + // Get the RequestItem that is up next in the queue. + CorrelatedRequestItem<?> currentItem = (CorrelatedRequestItem) queue.current(); + + if (currentItem == null) { + break; + } + // Send the TPDU. + try { + ChannelFuture channelFuture = queue.removeAndWrite(); + ctx.flush(); + if (channelFuture == null) { + break; + } + } catch (Exception e) { + LOGGER.error("Error sending more queues messages", e); + ctx.fireExceptionCaught(e); + } + + // Add it to the list of sentButUnacknowledgedRequestItems. + sentButUnacknowledgedRequestItems.put(currentItem.getCorrelationId(), currentItem); + + LOGGER.debug("Item Message with id {} sent", currentItem.getCorrelationId()); + } + ctx.flush(); + } +}