This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
The following commit(s) were added to refs/heads/master by this push: new 3a77612 implement proper serial communication using locks for now 3a77612 is described below commit 3a77612411709ccc5c61f9e3b0a754e293709f3f Author: Sebastian Rühl <sru...@apache.org> AuthorDate: Mon May 14 14:53:26 2018 +0200 implement proper serial communication using locks for now --- .../java/ads/protocol/Payload2SerialProtocol.java | 52 +++++++++++++++++++--- 1 file changed, 47 insertions(+), 5 deletions(-) diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Payload2SerialProtocol.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Payload2SerialProtocol.java index 4ef0fab..2e4cc37 100644 --- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Payload2SerialProtocol.java +++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Payload2SerialProtocol.java @@ -23,6 +23,8 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageCodec; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.ScheduledFuture; import org.apache.plc4x.java.ads.api.serial.AmsSerialAcknowledgeFrame; import org.apache.plc4x.java.ads.api.serial.AmsSerialFrame; import org.apache.plc4x.java.ads.api.serial.AmsSerialResetFrame; @@ -33,16 +35,52 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; public class Payload2SerialProtocol extends MessageToMessageCodec<ByteBuf, ByteBuf> { private static final Logger LOGGER = LoggerFactory.getLogger(Payload2TcpProtocol.class); + private final AtomicInteger fragmentCounter = new AtomicInteger(0); + + private final AtomicBoolean frameOnTheWay = new AtomicBoolean(false); + + private volatile ScheduledFuture<ChannelFuture> retryHandler; + + private final Lock lock = new ReentrantLock(true); + @Override protected void encode(ChannelHandlerContext channelHandlerContext, ByteBuf amsPacket, List<Object> out) throws Exception { - out.add(AmsSerialFrame.of(FragmentNumber.of((byte) 0), UserData.of(amsPacket)).getByteBuf()); - // TODO: we need to remember the fragment and maybe even need to spilt up the package - // TODO: if we exceed 255 byte + while (frameOnTheWay.get() || !lock.tryLock()) { + // In this case we might not send it yet. + TimeUnit.MILLISECONDS.sleep(10); + } + int fragmentNumber = fragmentCounter.getAndIncrement(); + if (fragmentNumber > 255) { + fragmentNumber = 0; + fragmentCounter.set(fragmentNumber); + } + try { + // TODO: we need to remember the fragment and maybe even need to spilt up the package + // TODO: if we exceed 255 byte + AmsSerialFrame amsSerialFrame = AmsSerialFrame.of(FragmentNumber.of((byte) fragmentNumber), UserData.of(amsPacket)); + out.add(amsSerialFrame.getByteBuf()); + retryHandler = channelHandlerContext.executor().schedule(() -> { + try { + TimeUnit.SECONDS.sleep(2); + channelHandlerContext.channel().writeAndFlush(amsSerialFrame.getByteBuf()); + } catch (InterruptedException ignore) { + } + return channelHandlerContext.voidPromise(); + }, 0, TimeUnit.MILLISECONDS); + frameOnTheWay.set(true); + } finally { + lock.unlock(); + } } @Override @@ -70,19 +108,23 @@ public class Payload2SerialProtocol extends MessageToMessageCodec<ByteBuf, ByteB // TODO: check if this is the right way to ack a package. ChannelFuture channelFuture = channelHandlerContext.writeAndFlush(AmsSerialAcknowledgeFrame.of(transmitterAddress, receiverAddress, fragmentNumber)); channelFuture.addListener((ChannelFutureListener) future -> { - // TODO: we might wait for the ack-frame to be transmitted before we forward the package + frameOnTheWay.set(false); }); - // TODO: check if this await is right as this might be against the concept of netty + // waiting for the ack-frame to be transmitted before we forward the package channelFuture.await(); out.add(userData.getByteBuf()); break; case AmsSerialAcknowledgeFrame.ID: AmsSerialAcknowledgeFrame amsSerialAcknowledgeFrame = AmsSerialAcknowledgeFrame.of(magicCookie, transmitterAddress, receiverAddress, fragmentNumber, userDataLength, crc); LOGGER.debug("Ams Serial ACK Frame received {}", amsSerialAcknowledgeFrame); + retryHandler.cancel(true); + ReferenceCountUtil.release(byteBuf); break; case AmsSerialResetFrame.ID: + // TODO: how to react to a reset AmsSerialResetFrame amsSerialResetFrame = AmsSerialResetFrame.of(magicCookie, transmitterAddress, receiverAddress, fragmentNumber, userDataLength, crc); LOGGER.debug("Ams Serial Reset Frame received {}", amsSerialResetFrame); + ReferenceCountUtil.release(byteBuf); break; } CRC calculatedCrc = CRC.of(DigestUtil.calculateCrc16(magicCookie, transmitterAddress, receiverAddress, fragmentNumber, userDataLength, userData)); -- To stop receiving notification emails like this one, please contact sru...@apache.org.