This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch netty-serial-nio
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/netty-serial-nio by this push:
     new 84fa760  Added first draft of Netty NIO Implementation for Serial Port 
communication + little documentation about writing drivers.
84fa760 is described below

commit 84fa760fa0ee5cc647bbd665598b103ee0f643ac
Author: Julian Feinauer <[email protected]>
AuthorDate: Sun Aug 11 14:22:57 2019 +0200

    Added first draft of Netty NIO Implementation for Serial Port communication 
+ little documentation about writing drivers.
---
 plc4j/protocols/driver-bases/serial/pom.xml        |   6 +
 .../plc4x/java/base/connection/SerialChannel.java  | 269 ++++++++++++++++++---
 .../java/base/connection/SerialChannelHandler.java |  94 +++++++
 .../base/connection/SerialPollingSelector.java     |  97 +++++++-
 .../java/base/connection/SerialSelectionKey.java   |  62 +++++
 .../java/base/connection/SerialSocketChannel.java  |  52 ++--
 .../base/connection/SerialChannelFactoryTest.java  |  54 +++--
 .../developers/writing-driver/writing-driver.adoc  |  57 +++++
 8 files changed, 620 insertions(+), 71 deletions(-)

diff --git a/plc4j/protocols/driver-bases/serial/pom.xml 
b/plc4j/protocols/driver-bases/serial/pom.xml
index 921b02e..b4400d8 100644
--- a/plc4j/protocols/driver-bases/serial/pom.xml
+++ b/plc4j/protocols/driver-bases/serial/pom.xml
@@ -57,6 +57,12 @@
       <artifactId>logback-classic</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-protocol-driver-base-tcp</artifactId>
+      <version>0.5.0-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannel.java
 
b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannel.java
index 4074058..9c48cf8 100644
--- 
a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannel.java
+++ 
b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannel.java
@@ -19,21 +19,40 @@
 
 package org.apache.plc4x.java.base.connection;
 
+import com.fazecast.jSerialComm.SerialPort;
+import com.fazecast.jSerialComm.SerialPortDataListener;
+import com.fazecast.jSerialComm.SerialPortEvent;
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.*;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.AbstractChannel;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOutboundBuffer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelConfig;
+import io.netty.channel.DefaultChannelPipeline;
+import io.netty.channel.EventLoop;
+import io.netty.channel.FileRegion;
+import io.netty.channel.RecvByteBufAllocator;
+import io.netty.channel.VoidChannelPromise;
+import io.netty.channel.jsc.JSerialCommDeviceAddress;
 import io.netty.channel.nio.AbstractNioByteChannel;
 import io.netty.channel.nio.AbstractNioChannel;
 import io.netty.channel.nio.NioEventLoop;
 import io.netty.channel.socket.DuplexChannel;
 import org.apache.commons.lang3.NotImplementedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.SocketAddress;
-import java.nio.channels.*;
-import java.nio.channels.spi.AbstractSelectableChannel;
-import java.nio.channels.spi.AbstractSelector;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
+import java.util.concurrent.RejectedExecutionException;
 
 /**
  * TODO write comment
@@ -43,9 +62,20 @@ import java.nio.channels.spi.AbstractSelector;
  */
 public class SerialChannel extends AbstractNioByteChannel implements 
DuplexChannel {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(SerialChannel.class);
+    private final ChannelConfig config;
+
+    private final VoidChannelPromise unsafeVoidPromise = new 
VoidChannelPromise(this, false);
+    private boolean readPending = false; // Did we receive an EOF?
+    private SocketAddress remoteAddress;
+    private boolean active = false;
+    private SerialSelectionKey selectionKey;
+    private SerialChannelHandler comPort;
+
 
     public SerialChannel() {
         this(null, new SerialSocketChannel(new SerialSelectorProvider()));
+        ((SerialSocketChannel) javaChannel()).setChild(this);
     }
 
     /**
@@ -56,6 +86,7 @@ public class SerialChannel extends AbstractNioByteChannel 
implements DuplexChann
      */
     protected SerialChannel(Channel parent, SelectableChannel ch) {
         super(parent, ch);
+        config = new DefaultChannelConfig(this);
     }
 
     @Override
@@ -65,72 +96,102 @@ public class SerialChannel extends AbstractNioByteChannel 
implements DuplexChann
 
     @Override
     public boolean isInputShutdown() {
-        return false;
+        throw new NotImplementedException("");
     }
 
     @Override
     public ChannelFuture shutdownInput() {
-        return null;
+        throw new NotImplementedException("");
     }
 
     @Override
     public ChannelFuture shutdownInput(ChannelPromise promise) {
-        return null;
+        throw new NotImplementedException("");
     }
 
     @Override
     public boolean isOutputShutdown() {
-        return false;
+        throw new NotImplementedException("");
     }
 
     @Override
     public ChannelFuture shutdownOutput() {
-        return null;
+        throw new NotImplementedException("");
     }
 
     @Override
     public ChannelFuture shutdownOutput(ChannelPromise promise) {
-        return null;
+        throw new NotImplementedException("");
     }
 
     @Override
     public boolean isShutdown() {
-        return false;
+        throw new NotImplementedException("");
     }
 
     @Override
     public ChannelFuture shutdown() {
-        return null;
+        throw new NotImplementedException("");
     }
 
     @Override
     public ChannelFuture shutdown(ChannelPromise promise) {
-        return null;
+        throw new NotImplementedException("");
     }
 
     @Override
     protected long doWriteFileRegion(FileRegion region) throws Exception {
-        return 0;
+        throw new NotImplementedException("");
     }
 
     @Override
     protected int doReadBytes(ByteBuf buf) throws Exception {
-        return 0;
+        if (!active) {
+            return 0;
+        }
+        // TODO Here we really read the bytes
+        logger.debug("Trying to read bytes from wire...");
+        buf.writeByte(0x01);
+        return 1;
     }
 
     @Override
     protected int doWriteBytes(ByteBuf buf) throws Exception {
-        return 0;
+        throw new NotImplementedException("");
     }
 
     @Override
     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress 
localAddress) throws Exception {
-        return false;
+        this.remoteAddress = remoteAddress;
+        if (!(remoteAddress instanceof JSerialCommDeviceAddress)) {
+            throw new IllegalArgumentException("Socket Address has to be of 
type " + JSerialCommDeviceAddress.class);
+        }
+        logger.debug("Connecting to Socket Address '{}'", 
((JSerialCommDeviceAddress) remoteAddress).value());
+
+        try {
+            // TODO this should take port from remote Adress
+            comPort = SerialChannelHandler.DummyHandler.INSTANCE;
+            logger.debug("Using Com Port {}, trying to open port", 
comPort.getIdentifier());
+            if (comPort.open()) {
+                logger.debug("Opened port successful to {}", 
comPort.getIdentifier());
+                comPort.registerSelectionKey(selectionKey);
+
+                this.active = true;
+                return true;
+            } else {
+                logger.debug("Unable to open port {}", 
comPort.getIdentifier());
+                return false;
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            this.active = false;
+            return false;
+        }
     }
 
     @Override
     protected void doFinishConnect() throws Exception {
-
+        throw new NotImplementedException("");
     }
 
     @Override
@@ -145,25 +206,28 @@ public class SerialChannel extends AbstractNioByteChannel 
implements DuplexChann
 
     @Override
     protected void doBind(SocketAddress localAddress) throws Exception {
-
+        throw new NotImplementedException("");
     }
 
     @Override
     protected void doDisconnect() throws Exception {
-
+        throw new NotImplementedException("");
     }
 
     @Override
     public ChannelConfig config() {
-        return null;
+        return this.config;
     }
 
     @Override
     public boolean isActive() {
-        return false;
+        return active;
     }
 
-    private static class SerialNioUnsafe implements NioUnsafe {
+    private class SerialNioUnsafe implements NioUnsafe {
+
+        private RecvByteBufAllocator.Handle recvHandle;
+
         @Override
         public SelectableChannel ch() {
             throw new NotImplementedException("");
@@ -174,9 +238,62 @@ public class SerialChannel extends AbstractNioByteChannel 
implements DuplexChann
             throw new NotImplementedException("");
         }
 
+        // See NioByteUnsafe#read()
         @Override
         public void read() {
-            throw new NotImplementedException("");
+            logger.debug("Reading...");
+            // TODO we should read something here, okay?!
+            final ChannelConfig config = config();
+            final ChannelPipeline pipeline = pipeline();
+            final ByteBufAllocator allocator = config.getAllocator();
+            final RecvByteBufAllocator.Handle allocHandle = 
recvBufAllocHandle();
+            allocHandle.reset(config);
+
+            ByteBuf byteBuf = null;
+            boolean close = false;
+            try {
+                do {
+                    byteBuf = allocHandle.allocate(allocator);
+                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
+                    if (allocHandle.lastBytesRead() <= 0) {
+                        // nothing was read. release the buffer.
+                        byteBuf.release();
+                        byteBuf = null;
+                        close = allocHandle.lastBytesRead() < 0;
+                        if (close) {
+                            // There is nothing left to read as we received an 
EOF.
+                            readPending = false;
+                        }
+                        break;
+                    }
+
+                    allocHandle.incMessagesRead(1);
+                    readPending = false;
+                    pipeline.fireChannelRead(byteBuf);
+                    byteBuf = null;
+                } while (allocHandle.continueReading());
+
+                allocHandle.readComplete();
+                pipeline.fireChannelReadComplete();
+
+                if (close) {
+                    // TODO
+                    //closeOnRead(pipeline);
+                }
+            } catch (Throwable t) {
+                // TODO
+                // handleReadException(pipeline, byteBuf, t, close, 
allocHandle);
+            } finally {
+                // Check if there is a readPending which was not processed yet.
+                // This could be for two reasons:
+                // * The user called Channel.read() or 
ChannelHandlerContext.read() in channelRead(...) method
+                // * The user called Channel.read() or 
ChannelHandlerContext.read() in channelReadComplete(...) method
+                //
+                // See https://github.com/netty/netty/issues/2254
+                if (!readPending && !config.isAutoRead()) {
+                    // TODO
+                }
+            }
         }
 
         @Override
@@ -186,7 +303,10 @@ public class SerialChannel extends AbstractNioByteChannel 
implements DuplexChann
 
         @Override
         public RecvByteBufAllocator.Handle recvBufAllocHandle() {
-            throw new NotImplementedException("");
+            if (recvHandle == null) {
+                recvHandle = config().getRecvByteBufAllocator().newHandle();
+            }
+            return recvHandle;
         }
 
         @Override
@@ -196,7 +316,7 @@ public class SerialChannel extends AbstractNioByteChannel 
implements DuplexChann
 
         @Override
         public SocketAddress remoteAddress() {
-            throw new NotImplementedException("");
+            return null;
         }
 
         @Override
@@ -215,8 +335,42 @@ public class SerialChannel extends AbstractNioByteChannel 
implements DuplexChann
                 method.setAccessible(true);
                 SerialPollingSelector selector = (SerialPollingSelector) 
method.invoke(eventLoop);
 
-                selector.register((AbstractSelectableChannel) 
promise.channel(), 0, null);
-            } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
+                // Register the channel
+                selectionKey = (SerialSelectionKey) ((SerialChannel) 
promise.channel()).javaChannel().register(selector, 0, SerialChannel.this);
+
+                // Set selection key
+                final Field selectionKeyField = 
AbstractNioChannel.class.getDeclaredField("selectionKey");
+                selectionKeyField.setAccessible(true);
+                selectionKeyField.set(SerialChannel.this, selectionKey);
+
+                // Set event loop (again, via reflection)
+                final Field loop = 
AbstractChannel.class.getDeclaredField("eventLoop");
+                loop.setAccessible(true);
+                loop.set(SerialChannel.this, eventLoop);
+
+                // Register Pipeline, if necessary
+                // Ensure we call handlerAdded(...) before we actually notify 
the promise. This is needed as the
+                // user may already fire events through the pipeline in the 
ChannelFutureListener.
+                if (!(pipeline() instanceof DefaultChannelPipeline)) {
+                    throw new IllegalStateException("Pipeline should be of 
Type " + DefaultChannelPipeline.class);
+                }
+                // Again reflection, but has to be done in an event loop
+                eventLoop().execute(() -> {
+                    try {
+                        final Method invokeHandlerAddedIfNeeded = 
DefaultChannelPipeline.class.getDeclaredMethod("invokeHandlerAddedIfNeeded");
+                        invokeHandlerAddedIfNeeded.setAccessible(true);
+
+                        invokeHandlerAddedIfNeeded.invoke(pipeline());
+
+                        pipeline().fireChannelRegistered();
+                    } catch (IllegalAccessException | 
InvocationTargetException | NoSuchMethodException e) {
+                        e.printStackTrace();
+                    }
+                });
+
+                // Return promise
+                promise.setSuccess();
+            } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | ClosedChannelException | NoSuchFieldException e) {
                 e.printStackTrace();
                 throw new NotImplementedException("Should register channel to 
event loop!!!");
             }
@@ -229,7 +383,21 @@ public class SerialChannel extends AbstractNioByteChannel 
implements DuplexChann
 
         @Override
         public void connect(SocketAddress remoteAddress, SocketAddress 
localAddress, ChannelPromise promise) {
-            throw new NotImplementedException("");
+            eventLoop().execute(() -> {
+                try {
+                    final boolean sucess = doConnect(remoteAddress, 
localAddress);
+                    if (sucess) {
+                        // Send a message to the pipeline
+                        pipeline().fireChannelActive();
+                        // Finally, close the promise
+                        promise.setSuccess();
+                    } else {
+                        promise.setFailure(new RuntimeException("Unable to 
open the com port " + remoteAddress.toString()));
+                    }
+                } catch (Exception e) {
+                    promise.setFailure(e);
+                }
+            });
         }
 
         @Override
@@ -239,6 +407,8 @@ public class SerialChannel extends AbstractNioByteChannel 
implements DuplexChann
 
         @Override
         public void close(ChannelPromise promise) {
+            logger.debug("Closing the Serial Port '{}'", this.remoteAddress());
+            // TODO this should close the Serial Port
             throw new NotImplementedException("");
         }
 
@@ -253,8 +423,43 @@ public class SerialChannel extends AbstractNioByteChannel 
implements DuplexChann
         }
 
         @Override
-        public void beginRead() {
-            throw new NotImplementedException("");
+        public final void beginRead() {
+            assert eventLoop().inEventLoop();
+
+            if (!isActive()) {
+                return;
+            }
+
+            try {
+                doBeginRead();
+            } catch (final Exception e) {
+                invokeLater(new Runnable() {
+                    @Override
+                    public void run() {
+                        pipeline().fireExceptionCaught(e);
+                    }
+                });
+                close(voidPromise());
+            }
+        }
+
+        private void invokeLater(Runnable task) {
+            try {
+                // This method is used by outbound operation implementations 
to trigger an inbound event later.
+                // They do not trigger an inbound event immediately because an 
outbound operation might have been
+                // triggered by another inbound event handler method.  If 
fired immediately, the call stack
+                // will look like this for example:
+                //
+                //   handlerA.inboundBufferUpdated() - (1) an inbound handler 
method closes a connection.
+                //   -> handlerA.ctx.close()
+                //      -> channel.unsafe.close()
+                //         -> handlerA.channelInactive() - (2) another inbound 
handler method called while in (1) yet
+                //
+                // which means the execution of two inbound handler methods of 
the same handler overlap undesirably.
+                eventLoop().execute(task);
+            } catch (RejectedExecutionException e) {
+                logger.warn("Can't invoke task later as EventLoop rejected 
it", e);
+            }
         }
 
         @Override
@@ -269,7 +474,7 @@ public class SerialChannel extends AbstractNioByteChannel 
implements DuplexChann
 
         @Override
         public ChannelPromise voidPromise() {
-            throw new NotImplementedException("");
+            return unsafeVoidPromise;
         }
 
         @Override
diff --git 
a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannelHandler.java
 
b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannelHandler.java
new file mode 100644
index 0000000..e001458
--- /dev/null
+++ 
b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannelHandler.java
@@ -0,0 +1,94 @@
+package org.apache.plc4x.java.base.connection;
+
+import com.fazecast.jSerialComm.SerialPort;
+import com.fazecast.jSerialComm.SerialPortDataListener;
+import com.fazecast.jSerialComm.SerialPortEvent;
+import io.netty.channel.jsc.JSerialCommDeviceAddress;
+
+import java.net.SocketAddress;
+
+/**
+ * This is a wrapper mostly for testing {@link SerialChannel}, @{@link 
SerialPollingSelector},
+ * {@link SerialSelectionKey}, @{@link SerialSelectorProvider} and @{@link 
SerialSocketChannel}.
+ */
+public abstract class SerialChannelHandler {
+
+    private final SocketAddress address;
+
+    public SerialChannelHandler(SocketAddress address) {
+        this.address = address;
+    }
+
+    abstract boolean open();
+
+    abstract String getIdentifier();
+
+    /**
+     * This method registers the Callback to the SelectionKey / {@link 
java.nio.channels.Selector}
+     * which is necessary to notify the {@link java.nio.channels.Selector} 
about
+     * available data.
+     */
+    abstract void registerSelectionKey(SerialSelectionKey selectionKey);
+
+    public static class DummyHandler extends SerialChannelHandler {
+
+        public static final DummyHandler INSTANCE = new DummyHandler(null);
+
+        private SerialSelectionKey selectionKey;
+
+        public DummyHandler(SocketAddress address) {
+            super(address);
+        }
+
+        @Override public boolean open() {
+            return true;
+        }
+
+        @Override public String getIdentifier() {
+            return null;
+        }
+
+        @Override public void registerSelectionKey(SerialSelectionKey 
selectionKey) {
+            this.selectionKey = selectionKey;
+        }
+
+        public void fireEvent(int readyOp) {
+            ((SerialPollingSelector) this.selectionKey.selector())
+                .addEvent(new 
SerialPollingSelector.SelectorEvent(this.selectionKey, readyOp));
+        }
+    }
+
+
+    public static class SerialPortHandler extends SerialChannelHandler {
+
+        private SerialPort comPort;
+
+        public SerialPortHandler(SocketAddress address) {
+            super(address);
+            comPort = SerialPort.getCommPort(((JSerialCommDeviceAddress) 
address).value());
+        }
+
+        @Override public boolean open() {
+            return comPort.openPort();
+        }
+
+        @Override public String getIdentifier() {
+            return comPort.getDescriptivePortName();
+        }
+
+        @Override public void registerSelectionKey(SerialSelectionKey 
selectionKey) {
+            comPort.addDataListener(new SerialPortDataListener() {
+                @Override
+                public int getListeningEvents() {
+                    return SerialPort.LISTENING_EVENT_DATA_AVAILABLE;
+                }
+
+                @Override
+                public void serialEvent(SerialPortEvent event) {
+                    // TODO notify the selector that something happens
+                    selectionKey.selector().wakeup();
+                }
+            });
+        }
+    }
+}
diff --git 
a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialPollingSelector.java
 
b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialPollingSelector.java
index 95c8993..c68524d 100644
--- 
a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialPollingSelector.java
+++ 
b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialPollingSelector.java
@@ -19,6 +19,9 @@
 
 package org.apache.plc4x.java.base.connection;
 
+import io.netty.channel.VoidChannelPromise;
+import io.netty.util.concurrent.DefaultEventExecutor;
+import io.netty.util.concurrent.DefaultPromise;
 import org.apache.commons.lang3.NotImplementedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,10 +30,16 @@ import java.io.IOException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.spi.AbstractSelectableChannel;
+import java.nio.channels.spi.AbstractSelectionKey;
 import java.nio.channels.spi.AbstractSelector;
 import java.nio.channels.spi.SelectorProvider;
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 /**
  * TODO write comment
@@ -42,28 +51,77 @@ class SerialPollingSelector extends AbstractSelector {
 
     private static final Logger logger = 
LoggerFactory.getLogger(SerialPollingSelector.class);
 
+    private final List<SelectionKey> registeredChannels;
+    private final Set<SelectorEvent> events = ConcurrentHashMap.newKeySet();
+
+    // Use a Netty Promise
+    private final DefaultEventExecutor executor = new DefaultEventExecutor();
+    private DefaultPromise<Void> selectPromise;
+
+    public static class SelectorEvent {
+
+        private final SelectionKey key;
+
+        private final int event;
+        public SelectorEvent(SelectionKey key, int event) {
+            this.key = key;
+            this.event = event;
+        }
+
+        public SelectionKey getKey() {
+            return this.key;
+        }
+
+        public int getEvent() {
+            return event;
+        }
+
+    }
     public SerialPollingSelector(SelectorProvider selectorProvider) {
         super(selectorProvider);
+        registeredChannels = new ArrayList<>();
     }
 
     @Override
     public Set<SelectionKey> keys() {
-        return new HashSet<>();
+        return new HashSet<>(registeredChannels);
     }
 
+    /**
+     * Returns all keys that are in the events queue
+     * @return
+     */
     @Override
     public Set<SelectionKey> selectedKeys() {
-        return new HashSet<>();
+        return 
events.stream().map(SelectorEvent::getKey).collect(Collectors.toSet());
     }
 
     @Override
     public int selectNow() throws IOException {
-        throw new NotImplementedException("");
+        // throw new NotImplementedException("");
+        logger.debug("selectNow()");
+        // check if one channel is active
+        return events.size();
     }
 
     @Override
     public int select(long timeout) throws IOException {
-        throw new NotImplementedException("");
+        logger.debug("select({})", timeout);
+        if (events.size() > 0) {
+            return events.size();
+        }
+        this.selectPromise = new DefaultPromise<>(executor);
+        try {
+            if (selectPromise.await(timeout)) {
+                logger.debug("Promise was cancelled, new Events should be 
there.");
+            } else {
+                logger.debug("Promise timed out, expecting no new events.");
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException("Was interrupted", e);
+        }
+        return events.size();
     }
 
     @Override
@@ -73,10 +131,28 @@ class SerialPollingSelector extends AbstractSelector {
 
     @Override
     public Selector wakeup() {
-        // TODO do we have to do something here?
+        // throw new NotImplementedException("Not implemented for this 
selector, should not be needed.");
+        // NOOP
         return this;
     }
 
+    public void addEvent(SelectorEvent event) {
+        logger.debug("Adding Event to Selector, canceling Promise...");
+        this.events.add(event);
+        // Add the OP to the SelectionKey
+        ((SerialSelectionKey) event.key).addReadyOp(event.event);
+        // Close the future so that the select is fired imediatly
+        if (!selectPromise.isDone()) {
+            selectPromise.setSuccess(null);
+        } else {
+            logger.debug("Promise is already cancelled, skipping that.");
+        }
+    }
+
+    public void removeEvent(SerialSelectionKey serialSelectionKey) {
+        events.removeIf(event -> event.key.equals(serialSelectionKey));
+    }
+
     @Override
     protected void implCloseSelector() throws IOException {
         // TODO should we do something here?
@@ -88,7 +164,16 @@ class SerialPollingSelector extends AbstractSelector {
         if (!(ch instanceof SerialSocketChannel)) {
             throw new IllegalArgumentException("Given channel has to be of 
type " + SerialSocketChannel.class);
         }
-        throw new NotImplementedException("");
+        final SerialSelectionKey key = new SerialSelectionKey(ch, this, ops);
+        // Attach attr
+        key.attach(att);
+        synchronized (this) {
+            // TODO is this always the case??
+            final int index = registeredChannels.size();
+            registeredChannels.add(key);
+            key.setIndex(index);
+        }
+        return key;
     }
 
 }
diff --git 
a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSelectionKey.java
 
b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSelectionKey.java
new file mode 100644
index 0000000..1e64b74
--- /dev/null
+++ 
b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSelectionKey.java
@@ -0,0 +1,62 @@
+package org.apache.plc4x.java.base.connection;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.AbstractSelectionKey;
+
+class SerialSelectionKey extends AbstractSelectionKey {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(SerialSelectionKey.class);
+
+    final SelectableChannel channel;
+    final Selector selector;
+    int index;
+    private volatile int interestOps;
+    private int readyOps;
+
+    SerialSelectionKey(SelectableChannel channel, Selector selector, int 
interestOps) {
+        this.channel = channel;
+        this.selector = selector;
+        this.interestOps = interestOps;
+    }
+
+    public void setIndex(int index) {
+        this.index = index;
+    }
+
+    @Override public SelectableChannel channel() {
+        return this.channel;
+    }
+
+    @Override public Selector selector() {
+        return this.selector;
+    }
+
+    @Override public int interestOps() {
+        return this.interestOps;
+    }
+
+    @Override public SelectionKey interestOps(int ops) {
+        this.interestOps = ops;
+        return this;
+    }
+
+    @Override public int readyOps() {
+        final int ops = this.readyOps;
+        this.readyOps = 0;
+        // Reset events for this here in Selector
+        ((SerialPollingSelector) selector).removeEvent(this);
+        logger.debug("Returning ready operation {}", ops);
+        return ops;
+    }
+
+    public void addReadyOp(int event) {
+        readyOps = readyOps | event;
+        logger.debug("Adding event {} to ready ops, now having ready ops {}", 
event, readyOps);
+    }
+}
diff --git 
a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSocketChannel.java
 
b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSocketChannel.java
index e1b93bf..3c17605 100644
--- 
a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSocketChannel.java
+++ 
b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSocketChannel.java
@@ -19,6 +19,10 @@
 
 package org.apache.plc4x.java.base.connection;
 
+import org.apache.commons.lang3.NotImplementedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.net.Socket;
 import java.net.SocketAddress;
@@ -36,6 +40,10 @@ import java.util.Set;
  */
 class SerialSocketChannel extends SocketChannel {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(SerialSocketChannel.class);
+
+    private SerialChannel child =  null;
+
     /**
      * Initializes a new instance of this class.
      *
@@ -47,96 +55,104 @@ class SerialSocketChannel extends SocketChannel {
 
     @Override
     public SocketChannel bind(SocketAddress local) throws IOException {
-        return null;
+        throw new NotImplementedException("");
     }
 
     @Override
     public <T> SocketChannel setOption(SocketOption<T> name, T value) throws 
IOException {
-        return null;
+        throw new NotImplementedException("");
     }
 
     @Override
     public SocketChannel shutdownInput() throws IOException {
-        return null;
+        throw new NotImplementedException("");
     }
 
     @Override
     public SocketChannel shutdownOutput() throws IOException {
-        return null;
+        throw new NotImplementedException("");
     }
 
     @Override
     public Socket socket() {
-        return null;
+        throw new NotImplementedException("");
     }
 
     @Override
     public boolean isConnected() {
-        return false;
+        throw new NotImplementedException("");
     }
 
     @Override
     public boolean isConnectionPending() {
-        return false;
+        throw new NotImplementedException("");
     }
 
     @Override
     public boolean connect(SocketAddress remote) throws IOException {
-        return false;
+        throw new NotImplementedException("");
     }
 
     @Override
     public boolean finishConnect() throws IOException {
-        return false;
+        throw new NotImplementedException("");
     }
 
     @Override
     public SocketAddress getRemoteAddress() throws IOException {
-        return null;
+        throw new NotImplementedException("");
     }
 
     @Override
     public int read(ByteBuffer dst) throws IOException {
-        return 0;
+        throw new NotImplementedException("");
     }
 
     @Override
     public long read(ByteBuffer[] dsts, int offset, int length) throws 
IOException {
-        return 0;
+        throw new NotImplementedException("");
     }
 
     @Override
     public int write(ByteBuffer src) throws IOException {
-        return 0;
+        throw new NotImplementedException("");
     }
 
     @Override
     public long write(ByteBuffer[] srcs, int offset, int length) throws 
IOException {
-        return 0;
+        throw new NotImplementedException("");
     }
 
     @Override
     public SocketAddress getLocalAddress() throws IOException {
-        return null;
+        throw new NotImplementedException("");
     }
 
     @Override
     public <T> T getOption(SocketOption<T> name) throws IOException {
-        return null;
+        throw new NotImplementedException("");
     }
 
     @Override
     public Set<SocketOption<?>> supportedOptions() {
-        return null;
+        throw new NotImplementedException("");
     }
 
     @Override
     protected void implCloseSelectableChannel() throws IOException {
-
+        throw new NotImplementedException("");
     }
 
     @Override
     protected void implConfigureBlocking(boolean block) throws IOException {
+        logger.debug("Requesting Blocking mode to '{}'", block ? "blocking" : 
"non blocking");
+    }
+
+    public SerialChannel getChild() {
+        return child;
+    }
 
+    public void setChild(SerialChannel child) {
+        this.child = child;
     }
 }
diff --git 
a/plc4j/protocols/driver-bases/serial/src/test/java/org/apache/plc4x/java/base/connection/SerialChannelFactoryTest.java
 
b/plc4j/protocols/driver-bases/serial/src/test/java/org/apache/plc4x/java/base/connection/SerialChannelFactoryTest.java
index b2aa31b..b333cda 100644
--- 
a/plc4j/protocols/driver-bases/serial/src/test/java/org/apache/plc4x/java/base/connection/SerialChannelFactoryTest.java
+++ 
b/plc4j/protocols/driver-bases/serial/src/test/java/org/apache/plc4x/java/base/connection/SerialChannelFactoryTest.java
@@ -19,12 +19,21 @@
 
 package org.apache.plc4x.java.base.connection;
 
-import io.netty.channel.ChannelHandler;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFactory;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.ByteToMessageCodec;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import static org.junit.Assert.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
 
 /**
  * TODO write comment
@@ -34,24 +43,39 @@ import static org.junit.Assert.*;
  */
 public class SerialChannelFactoryTest {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(SerialChannelFactoryTest.class);
+
     @Test
-    public void createChannel() throws PlcConnectionException {
+    public void createChannel() throws PlcConnectionException, 
InterruptedException, UnknownHostException {
         SerialChannelFactory asdf = new SerialChannelFactory("asdf");
-        asdf.createChannel(new ChannelHandler() {
-            @Override
-            public void handlerAdded(ChannelHandlerContext ctx) throws 
Exception {
-
-            }
-
-            @Override
-            public void handlerRemoved(ChannelHandlerContext ctx) throws 
Exception {
-
+        final TcpSocketChannelFactory factory = new 
TcpSocketChannelFactory(InetAddress.getLocalHost(), 5432);
+        final Channel channel = asdf.createChannel(new 
ChannelInitializer<SerialChannel>() {
+            @Override protected void initChannel(SerialChannel ch) throws 
Exception {
+                ch.pipeline().addLast(new DemoCodec());
             }
+        });
+        Thread.sleep(100);
+        for (int i = 1; i <= 10; i++) {
+            Thread.sleep(10);
+            SerialChannelHandler.DummyHandler.INSTANCE.fireEvent(1);
+        }
+        Thread.sleep(100);
+        channel.close().sync();
+    }
 
-            @Override
-            public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) throws Exception {
+    private static class DemoCodec extends ByteToMessageCodec<Object> {
+        @Override protected void encode(ChannelHandlerContext 
channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
+            // do nothing here
+        }
 
+        @Override protected void decode(ChannelHandlerContext 
channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
+            byteBuf.markReaderIndex();
+            StringBuffer sb = new StringBuffer();
+            for (int i = 1; i <= byteBuf.readableBytes(); i++) {
+                sb.append(byteBuf.readByte() + ", ");
             }
-        });
+            byteBuf.resetReaderIndex();
+            logger.debug("We currently have {} readable bytes: {}", 
byteBuf.readableBytes(), sb.toString());
+        }
     }
 }
\ No newline at end of file
diff --git a/src/site/asciidoc/developers/writing-driver/writing-driver.adoc 
b/src/site/asciidoc/developers/writing-driver/writing-driver.adoc
new file mode 100644
index 0000000..f071fac
--- /dev/null
+++ b/src/site/asciidoc/developers/writing-driver/writing-driver.adoc
@@ -0,0 +1,57 @@
+//
+//  Licensed to the Apache Software Foundation (ASF) under one or more
+//  contributor license agreements.  See the NOTICE file distributed with
+//  this work for additional information regarding copyright ownership.
+//  The ASF licenses this file to You under the Apache License, Version 2.0
+//  (the "License"); you may not use this file except in compliance with
+//  the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+//
+:imagesdir: ../../images/
+
+== Basic Building Blocs of a Driver / Protocol
+
+The general pipeline for a Protocol looks like the following:
+
+[ditaa,driver-anatomy]
+....
++-------------------------------------+
+|  Upstream (e.g. PLC4X API           :
++-------------------------------------+
+                  |
+                  :
+                  v
++-------------------------------------+
+|  PLC4X Optimizier                   :
+|  Possibly collapse requests         |
++-------------------------------------+
+                  |
+                  :
+                  v
++-------------------------------------+
+|  Protocol L1                        |
+|  Req. to (generated) Message Objects|
++-------------------------------------+
+                  |
+                  |
+                  v
++-------------------------------------+
+|  Protocol L2                        |
+|  Generated Objects to bytes         |
++-------------------------------------+
+                  |
+                  |
+                  v
++-------------------------------------+
+|  Downstream (e.g. Socket for I/O)   :
++-------------------------------------+
+....
+
+A driver has the task to assemble the pipeline for the protocol(s).
\ No newline at end of file

Reply via email to