This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a77612  implement proper serial communication using locks for now
3a77612 is described below

commit 3a77612411709ccc5c61f9e3b0a754e293709f3f
Author: Sebastian Rühl <sru...@apache.org>
AuthorDate: Mon May 14 14:53:26 2018 +0200

    implement proper serial communication using locks for now
---
 .../java/ads/protocol/Payload2SerialProtocol.java  | 52 +++++++++++++++++++---
 1 file changed, 47 insertions(+), 5 deletions(-)

diff --git 
a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Payload2SerialProtocol.java
 
b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Payload2SerialProtocol.java
index 4ef0fab..2e4cc37 100644
--- 
a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Payload2SerialProtocol.java
+++ 
b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Payload2SerialProtocol.java
@@ -23,6 +23,8 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToMessageCodec;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.ScheduledFuture;
 import org.apache.plc4x.java.ads.api.serial.AmsSerialAcknowledgeFrame;
 import org.apache.plc4x.java.ads.api.serial.AmsSerialFrame;
 import org.apache.plc4x.java.ads.api.serial.AmsSerialResetFrame;
@@ -33,16 +35,52 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 public class Payload2SerialProtocol extends MessageToMessageCodec<ByteBuf, 
ByteBuf> {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(Payload2TcpProtocol.class);
 
+    private final AtomicInteger fragmentCounter = new AtomicInteger(0);
+
+    private final AtomicBoolean frameOnTheWay = new AtomicBoolean(false);
+
+    private volatile ScheduledFuture<ChannelFuture> retryHandler;
+
+    private final Lock lock = new ReentrantLock(true);
+
     @Override
     protected void encode(ChannelHandlerContext channelHandlerContext, ByteBuf 
amsPacket, List<Object> out) throws Exception {
-        out.add(AmsSerialFrame.of(FragmentNumber.of((byte) 0), 
UserData.of(amsPacket)).getByteBuf());
-        // TODO: we need to remember the fragment and maybe even need to spilt 
up the package
-        // TODO: if we exceed 255 byte
+        while (frameOnTheWay.get() || !lock.tryLock()) {
+            // In this case we might not send it yet.
+            TimeUnit.MILLISECONDS.sleep(10);
+        }
+        int fragmentNumber = fragmentCounter.getAndIncrement();
+        if (fragmentNumber > 255) {
+            fragmentNumber = 0;
+            fragmentCounter.set(fragmentNumber);
+        }
+        try {
+            // TODO: we need to remember the fragment and maybe even need to 
spilt up the package
+            // TODO: if we exceed 255 byte
+            AmsSerialFrame amsSerialFrame = 
AmsSerialFrame.of(FragmentNumber.of((byte) fragmentNumber), 
UserData.of(amsPacket));
+            out.add(amsSerialFrame.getByteBuf());
+            retryHandler = channelHandlerContext.executor().schedule(() -> {
+                try {
+                    TimeUnit.SECONDS.sleep(2);
+                    
channelHandlerContext.channel().writeAndFlush(amsSerialFrame.getByteBuf());
+                } catch (InterruptedException ignore) {
+                }
+                return channelHandlerContext.voidPromise();
+            }, 0, TimeUnit.MILLISECONDS);
+            frameOnTheWay.set(true);
+        } finally {
+            lock.unlock();
+        }
     }
 
     @Override
@@ -70,19 +108,23 @@ public class Payload2SerialProtocol extends 
MessageToMessageCodec<ByteBuf, ByteB
                 // TODO: check if this is the right way to ack a package.
                 ChannelFuture channelFuture = 
channelHandlerContext.writeAndFlush(AmsSerialAcknowledgeFrame.of(transmitterAddress,
 receiverAddress, fragmentNumber));
                 channelFuture.addListener((ChannelFutureListener) future -> {
-                    // TODO: we might wait for the ack-frame to be transmitted 
before we forward the package
+                    frameOnTheWay.set(false);
                 });
-                // TODO: check if this await is right as this might be against 
the concept of netty
+                // waiting for the ack-frame to be transmitted before we 
forward the package
                 channelFuture.await();
                 out.add(userData.getByteBuf());
                 break;
             case AmsSerialAcknowledgeFrame.ID:
                 AmsSerialAcknowledgeFrame amsSerialAcknowledgeFrame = 
AmsSerialAcknowledgeFrame.of(magicCookie, transmitterAddress, receiverAddress, 
fragmentNumber, userDataLength, crc);
                 LOGGER.debug("Ams Serial ACK Frame received {}", 
amsSerialAcknowledgeFrame);
+                retryHandler.cancel(true);
+                ReferenceCountUtil.release(byteBuf);
                 break;
             case AmsSerialResetFrame.ID:
+                // TODO: how to react to a reset
                 AmsSerialResetFrame amsSerialResetFrame = 
AmsSerialResetFrame.of(magicCookie, transmitterAddress, receiverAddress, 
fragmentNumber, userDataLength, crc);
                 LOGGER.debug("Ams Serial Reset Frame received {}", 
amsSerialResetFrame);
+                ReferenceCountUtil.release(byteBuf);
                 break;
         }
         CRC calculatedCrc = CRC.of(DigestUtil.calculateCrc16(magicCookie, 
transmitterAddress, receiverAddress, fragmentNumber, userDataLength, userData));

-- 
To stop receiving notification emails like this one, please contact
sru...@apache.org.

Reply via email to