This is an automated email from the ASF dual-hosted git repository.
jfeinauer pushed a commit to branch netty-serial-nio
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/netty-serial-nio by this push:
new 84fa760 Added first draft of Netty NIO Implementation for Serial Port
communication + little documentation about writing drivers.
84fa760 is described below
commit 84fa760fa0ee5cc647bbd665598b103ee0f643ac
Author: Julian Feinauer <[email protected]>
AuthorDate: Sun Aug 11 14:22:57 2019 +0200
Added first draft of Netty NIO Implementation for Serial Port communication
+ little documentation about writing drivers.
---
plc4j/protocols/driver-bases/serial/pom.xml | 6 +
.../plc4x/java/base/connection/SerialChannel.java | 269 ++++++++++++++++++---
.../java/base/connection/SerialChannelHandler.java | 94 +++++++
.../base/connection/SerialPollingSelector.java | 97 +++++++-
.../java/base/connection/SerialSelectionKey.java | 62 +++++
.../java/base/connection/SerialSocketChannel.java | 52 ++--
.../base/connection/SerialChannelFactoryTest.java | 54 +++--
.../developers/writing-driver/writing-driver.adoc | 57 +++++
8 files changed, 620 insertions(+), 71 deletions(-)
diff --git a/plc4j/protocols/driver-bases/serial/pom.xml
b/plc4j/protocols/driver-bases/serial/pom.xml
index 921b02e..b4400d8 100644
--- a/plc4j/protocols/driver-bases/serial/pom.xml
+++ b/plc4j/protocols/driver-bases/serial/pom.xml
@@ -57,6 +57,12 @@
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.plc4x</groupId>
+ <artifactId>plc4j-protocol-driver-base-tcp</artifactId>
+ <version>0.5.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannel.java
b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannel.java
index 4074058..9c48cf8 100644
---
a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannel.java
+++
b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannel.java
@@ -19,21 +19,40 @@
package org.apache.plc4x.java.base.connection;
+import com.fazecast.jSerialComm.SerialPort;
+import com.fazecast.jSerialComm.SerialPortDataListener;
+import com.fazecast.jSerialComm.SerialPortEvent;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.*;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOutboundBuffer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelConfig;
+import io.netty.channel.DefaultChannelPipeline;
+import io.netty.channel.EventLoop;
+import io.netty.channel.FileRegion;
+import io.netty.channel.RecvByteBufAllocator;
+import io.netty.channel.VoidChannelPromise;
+import io.netty.channel.jsc.JSerialCommDeviceAddress;
import io.netty.channel.nio.AbstractNioByteChannel;
import io.netty.channel.nio.AbstractNioChannel;
import io.netty.channel.nio.NioEventLoop;
import io.netty.channel.socket.DuplexChannel;
import org.apache.commons.lang3.NotImplementedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.SocketAddress;
-import java.nio.channels.*;
-import java.nio.channels.spi.AbstractSelectableChannel;
-import java.nio.channels.spi.AbstractSelector;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
+import java.util.concurrent.RejectedExecutionException;
/**
* TODO write comment
@@ -43,9 +62,20 @@ import java.nio.channels.spi.AbstractSelector;
*/
public class SerialChannel extends AbstractNioByteChannel implements
DuplexChannel {
+ private static final Logger logger =
LoggerFactory.getLogger(SerialChannel.class);
+ private final ChannelConfig config;
+
+ private final VoidChannelPromise unsafeVoidPromise = new
VoidChannelPromise(this, false);
+ private boolean readPending = false; // Did we receive an EOF?
+ private SocketAddress remoteAddress;
+ private boolean active = false;
+ private SerialSelectionKey selectionKey;
+ private SerialChannelHandler comPort;
+
public SerialChannel() {
this(null, new SerialSocketChannel(new SerialSelectorProvider()));
+ ((SerialSocketChannel) javaChannel()).setChild(this);
}
/**
@@ -56,6 +86,7 @@ public class SerialChannel extends AbstractNioByteChannel
implements DuplexChann
*/
protected SerialChannel(Channel parent, SelectableChannel ch) {
super(parent, ch);
+ config = new DefaultChannelConfig(this);
}
@Override
@@ -65,72 +96,102 @@ public class SerialChannel extends AbstractNioByteChannel
implements DuplexChann
@Override
public boolean isInputShutdown() {
- return false;
+ throw new NotImplementedException("");
}
@Override
public ChannelFuture shutdownInput() {
- return null;
+ throw new NotImplementedException("");
}
@Override
public ChannelFuture shutdownInput(ChannelPromise promise) {
- return null;
+ throw new NotImplementedException("");
}
@Override
public boolean isOutputShutdown() {
- return false;
+ throw new NotImplementedException("");
}
@Override
public ChannelFuture shutdownOutput() {
- return null;
+ throw new NotImplementedException("");
}
@Override
public ChannelFuture shutdownOutput(ChannelPromise promise) {
- return null;
+ throw new NotImplementedException("");
}
@Override
public boolean isShutdown() {
- return false;
+ throw new NotImplementedException("");
}
@Override
public ChannelFuture shutdown() {
- return null;
+ throw new NotImplementedException("");
}
@Override
public ChannelFuture shutdown(ChannelPromise promise) {
- return null;
+ throw new NotImplementedException("");
}
@Override
protected long doWriteFileRegion(FileRegion region) throws Exception {
- return 0;
+ throw new NotImplementedException("");
}
@Override
protected int doReadBytes(ByteBuf buf) throws Exception {
- return 0;
+ if (!active) {
+ return 0;
+ }
+ // TODO Here we really read the bytes
+ logger.debug("Trying to read bytes from wire...");
+ buf.writeByte(0x01);
+ return 1;
}
@Override
protected int doWriteBytes(ByteBuf buf) throws Exception {
- return 0;
+ throw new NotImplementedException("");
}
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress
localAddress) throws Exception {
- return false;
+ this.remoteAddress = remoteAddress;
+ if (!(remoteAddress instanceof JSerialCommDeviceAddress)) {
+ throw new IllegalArgumentException("Socket Address has to be of
type " + JSerialCommDeviceAddress.class);
+ }
+ logger.debug("Connecting to Socket Address '{}'",
((JSerialCommDeviceAddress) remoteAddress).value());
+
+ try {
+ // TODO this should take port from remote Adress
+ comPort = SerialChannelHandler.DummyHandler.INSTANCE;
+ logger.debug("Using Com Port {}, trying to open port",
comPort.getIdentifier());
+ if (comPort.open()) {
+ logger.debug("Opened port successful to {}",
comPort.getIdentifier());
+ comPort.registerSelectionKey(selectionKey);
+
+ this.active = true;
+ return true;
+ } else {
+ logger.debug("Unable to open port {}",
comPort.getIdentifier());
+ return false;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ this.active = false;
+ return false;
+ }
}
@Override
protected void doFinishConnect() throws Exception {
-
+ throw new NotImplementedException("");
}
@Override
@@ -145,25 +206,28 @@ public class SerialChannel extends AbstractNioByteChannel
implements DuplexChann
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
-
+ throw new NotImplementedException("");
}
@Override
protected void doDisconnect() throws Exception {
-
+ throw new NotImplementedException("");
}
@Override
public ChannelConfig config() {
- return null;
+ return this.config;
}
@Override
public boolean isActive() {
- return false;
+ return active;
}
- private static class SerialNioUnsafe implements NioUnsafe {
+ private class SerialNioUnsafe implements NioUnsafe {
+
+ private RecvByteBufAllocator.Handle recvHandle;
+
@Override
public SelectableChannel ch() {
throw new NotImplementedException("");
@@ -174,9 +238,62 @@ public class SerialChannel extends AbstractNioByteChannel
implements DuplexChann
throw new NotImplementedException("");
}
+ // See NioByteUnsafe#read()
@Override
public void read() {
- throw new NotImplementedException("");
+ logger.debug("Reading...");
+ // TODO we should read something here, okay?!
+ final ChannelConfig config = config();
+ final ChannelPipeline pipeline = pipeline();
+ final ByteBufAllocator allocator = config.getAllocator();
+ final RecvByteBufAllocator.Handle allocHandle =
recvBufAllocHandle();
+ allocHandle.reset(config);
+
+ ByteBuf byteBuf = null;
+ boolean close = false;
+ try {
+ do {
+ byteBuf = allocHandle.allocate(allocator);
+ allocHandle.lastBytesRead(doReadBytes(byteBuf));
+ if (allocHandle.lastBytesRead() <= 0) {
+ // nothing was read. release the buffer.
+ byteBuf.release();
+ byteBuf = null;
+ close = allocHandle.lastBytesRead() < 0;
+ if (close) {
+ // There is nothing left to read as we received an
EOF.
+ readPending = false;
+ }
+ break;
+ }
+
+ allocHandle.incMessagesRead(1);
+ readPending = false;
+ pipeline.fireChannelRead(byteBuf);
+ byteBuf = null;
+ } while (allocHandle.continueReading());
+
+ allocHandle.readComplete();
+ pipeline.fireChannelReadComplete();
+
+ if (close) {
+ // TODO
+ //closeOnRead(pipeline);
+ }
+ } catch (Throwable t) {
+ // TODO
+ // handleReadException(pipeline, byteBuf, t, close,
allocHandle);
+ } finally {
+ // Check if there is a readPending which was not processed yet.
+ // This could be for two reasons:
+ // * The user called Channel.read() or
ChannelHandlerContext.read() in channelRead(...) method
+ // * The user called Channel.read() or
ChannelHandlerContext.read() in channelReadComplete(...) method
+ //
+ // See https://github.com/netty/netty/issues/2254
+ if (!readPending && !config.isAutoRead()) {
+ // TODO
+ }
+ }
}
@Override
@@ -186,7 +303,10 @@ public class SerialChannel extends AbstractNioByteChannel
implements DuplexChann
@Override
public RecvByteBufAllocator.Handle recvBufAllocHandle() {
- throw new NotImplementedException("");
+ if (recvHandle == null) {
+ recvHandle = config().getRecvByteBufAllocator().newHandle();
+ }
+ return recvHandle;
}
@Override
@@ -196,7 +316,7 @@ public class SerialChannel extends AbstractNioByteChannel
implements DuplexChann
@Override
public SocketAddress remoteAddress() {
- throw new NotImplementedException("");
+ return null;
}
@Override
@@ -215,8 +335,42 @@ public class SerialChannel extends AbstractNioByteChannel
implements DuplexChann
method.setAccessible(true);
SerialPollingSelector selector = (SerialPollingSelector)
method.invoke(eventLoop);
- selector.register((AbstractSelectableChannel)
promise.channel(), 0, null);
- } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException e) {
+ // Register the channel
+ selectionKey = (SerialSelectionKey) ((SerialChannel)
promise.channel()).javaChannel().register(selector, 0, SerialChannel.this);
+
+ // Set selection key
+ final Field selectionKeyField =
AbstractNioChannel.class.getDeclaredField("selectionKey");
+ selectionKeyField.setAccessible(true);
+ selectionKeyField.set(SerialChannel.this, selectionKey);
+
+ // Set event loop (again, via reflection)
+ final Field loop =
AbstractChannel.class.getDeclaredField("eventLoop");
+ loop.setAccessible(true);
+ loop.set(SerialChannel.this, eventLoop);
+
+ // Register Pipeline, if necessary
+ // Ensure we call handlerAdded(...) before we actually notify
the promise. This is needed as the
+ // user may already fire events through the pipeline in the
ChannelFutureListener.
+ if (!(pipeline() instanceof DefaultChannelPipeline)) {
+ throw new IllegalStateException("Pipeline should be of
Type " + DefaultChannelPipeline.class);
+ }
+ // Again reflection, but has to be done in an event loop
+ eventLoop().execute(() -> {
+ try {
+ final Method invokeHandlerAddedIfNeeded =
DefaultChannelPipeline.class.getDeclaredMethod("invokeHandlerAddedIfNeeded");
+ invokeHandlerAddedIfNeeded.setAccessible(true);
+
+ invokeHandlerAddedIfNeeded.invoke(pipeline());
+
+ pipeline().fireChannelRegistered();
+ } catch (IllegalAccessException |
InvocationTargetException | NoSuchMethodException e) {
+ e.printStackTrace();
+ }
+ });
+
+ // Return promise
+ promise.setSuccess();
+ } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException | ClosedChannelException | NoSuchFieldException e) {
e.printStackTrace();
throw new NotImplementedException("Should register channel to
event loop!!!");
}
@@ -229,7 +383,21 @@ public class SerialChannel extends AbstractNioByteChannel
implements DuplexChann
@Override
public void connect(SocketAddress remoteAddress, SocketAddress
localAddress, ChannelPromise promise) {
- throw new NotImplementedException("");
+ eventLoop().execute(() -> {
+ try {
+ final boolean sucess = doConnect(remoteAddress,
localAddress);
+ if (sucess) {
+ // Send a message to the pipeline
+ pipeline().fireChannelActive();
+ // Finally, close the promise
+ promise.setSuccess();
+ } else {
+ promise.setFailure(new RuntimeException("Unable to
open the com port " + remoteAddress.toString()));
+ }
+ } catch (Exception e) {
+ promise.setFailure(e);
+ }
+ });
}
@Override
@@ -239,6 +407,8 @@ public class SerialChannel extends AbstractNioByteChannel
implements DuplexChann
@Override
public void close(ChannelPromise promise) {
+ logger.debug("Closing the Serial Port '{}'", this.remoteAddress());
+ // TODO this should close the Serial Port
throw new NotImplementedException("");
}
@@ -253,8 +423,43 @@ public class SerialChannel extends AbstractNioByteChannel
implements DuplexChann
}
@Override
- public void beginRead() {
- throw new NotImplementedException("");
+ public final void beginRead() {
+ assert eventLoop().inEventLoop();
+
+ if (!isActive()) {
+ return;
+ }
+
+ try {
+ doBeginRead();
+ } catch (final Exception e) {
+ invokeLater(new Runnable() {
+ @Override
+ public void run() {
+ pipeline().fireExceptionCaught(e);
+ }
+ });
+ close(voidPromise());
+ }
+ }
+
+ private void invokeLater(Runnable task) {
+ try {
+ // This method is used by outbound operation implementations
to trigger an inbound event later.
+ // They do not trigger an inbound event immediately because an
outbound operation might have been
+ // triggered by another inbound event handler method. If
fired immediately, the call stack
+ // will look like this for example:
+ //
+ // handlerA.inboundBufferUpdated() - (1) an inbound handler
method closes a connection.
+ // -> handlerA.ctx.close()
+ // -> channel.unsafe.close()
+ // -> handlerA.channelInactive() - (2) another inbound
handler method called while in (1) yet
+ //
+ // which means the execution of two inbound handler methods of
the same handler overlap undesirably.
+ eventLoop().execute(task);
+ } catch (RejectedExecutionException e) {
+ logger.warn("Can't invoke task later as EventLoop rejected
it", e);
+ }
}
@Override
@@ -269,7 +474,7 @@ public class SerialChannel extends AbstractNioByteChannel
implements DuplexChann
@Override
public ChannelPromise voidPromise() {
- throw new NotImplementedException("");
+ return unsafeVoidPromise;
}
@Override
diff --git
a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannelHandler.java
b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannelHandler.java
new file mode 100644
index 0000000..e001458
--- /dev/null
+++
b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannelHandler.java
@@ -0,0 +1,94 @@
+package org.apache.plc4x.java.base.connection;
+
+import com.fazecast.jSerialComm.SerialPort;
+import com.fazecast.jSerialComm.SerialPortDataListener;
+import com.fazecast.jSerialComm.SerialPortEvent;
+import io.netty.channel.jsc.JSerialCommDeviceAddress;
+
+import java.net.SocketAddress;
+
+/**
+ * This is a wrapper mostly for testing {@link SerialChannel}, @{@link
SerialPollingSelector},
+ * {@link SerialSelectionKey}, @{@link SerialSelectorProvider} and @{@link
SerialSocketChannel}.
+ */
+public abstract class SerialChannelHandler {
+
+ private final SocketAddress address;
+
+ public SerialChannelHandler(SocketAddress address) {
+ this.address = address;
+ }
+
+ abstract boolean open();
+
+ abstract String getIdentifier();
+
+ /**
+ * This method registers the Callback to the SelectionKey / {@link
java.nio.channels.Selector}
+ * which is necessary to notify the {@link java.nio.channels.Selector}
about
+ * available data.
+ */
+ abstract void registerSelectionKey(SerialSelectionKey selectionKey);
+
+ public static class DummyHandler extends SerialChannelHandler {
+
+ public static final DummyHandler INSTANCE = new DummyHandler(null);
+
+ private SerialSelectionKey selectionKey;
+
+ public DummyHandler(SocketAddress address) {
+ super(address);
+ }
+
+ @Override public boolean open() {
+ return true;
+ }
+
+ @Override public String getIdentifier() {
+ return null;
+ }
+
+ @Override public void registerSelectionKey(SerialSelectionKey
selectionKey) {
+ this.selectionKey = selectionKey;
+ }
+
+ public void fireEvent(int readyOp) {
+ ((SerialPollingSelector) this.selectionKey.selector())
+ .addEvent(new
SerialPollingSelector.SelectorEvent(this.selectionKey, readyOp));
+ }
+ }
+
+
+ public static class SerialPortHandler extends SerialChannelHandler {
+
+ private SerialPort comPort;
+
+ public SerialPortHandler(SocketAddress address) {
+ super(address);
+ comPort = SerialPort.getCommPort(((JSerialCommDeviceAddress)
address).value());
+ }
+
+ @Override public boolean open() {
+ return comPort.openPort();
+ }
+
+ @Override public String getIdentifier() {
+ return comPort.getDescriptivePortName();
+ }
+
+ @Override public void registerSelectionKey(SerialSelectionKey
selectionKey) {
+ comPort.addDataListener(new SerialPortDataListener() {
+ @Override
+ public int getListeningEvents() {
+ return SerialPort.LISTENING_EVENT_DATA_AVAILABLE;
+ }
+
+ @Override
+ public void serialEvent(SerialPortEvent event) {
+ // TODO notify the selector that something happens
+ selectionKey.selector().wakeup();
+ }
+ });
+ }
+ }
+}
diff --git
a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialPollingSelector.java
b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialPollingSelector.java
index 95c8993..c68524d 100644
---
a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialPollingSelector.java
+++
b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialPollingSelector.java
@@ -19,6 +19,9 @@
package org.apache.plc4x.java.base.connection;
+import io.netty.channel.VoidChannelPromise;
+import io.netty.util.concurrent.DefaultEventExecutor;
+import io.netty.util.concurrent.DefaultPromise;
import org.apache.commons.lang3.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,10 +30,16 @@ import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.AbstractSelectableChannel;
+import java.nio.channels.spi.AbstractSelectionKey;
import java.nio.channels.spi.AbstractSelector;
import java.nio.channels.spi.SelectorProvider;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
/**
* TODO write comment
@@ -42,28 +51,77 @@ class SerialPollingSelector extends AbstractSelector {
private static final Logger logger =
LoggerFactory.getLogger(SerialPollingSelector.class);
+ private final List<SelectionKey> registeredChannels;
+ private final Set<SelectorEvent> events = ConcurrentHashMap.newKeySet();
+
+ // Use a Netty Promise
+ private final DefaultEventExecutor executor = new DefaultEventExecutor();
+ private DefaultPromise<Void> selectPromise;
+
+ public static class SelectorEvent {
+
+ private final SelectionKey key;
+
+ private final int event;
+ public SelectorEvent(SelectionKey key, int event) {
+ this.key = key;
+ this.event = event;
+ }
+
+ public SelectionKey getKey() {
+ return this.key;
+ }
+
+ public int getEvent() {
+ return event;
+ }
+
+ }
public SerialPollingSelector(SelectorProvider selectorProvider) {
super(selectorProvider);
+ registeredChannels = new ArrayList<>();
}
@Override
public Set<SelectionKey> keys() {
- return new HashSet<>();
+ return new HashSet<>(registeredChannels);
}
+ /**
+ * Returns all keys that are in the events queue
+ * @return
+ */
@Override
public Set<SelectionKey> selectedKeys() {
- return new HashSet<>();
+ return
events.stream().map(SelectorEvent::getKey).collect(Collectors.toSet());
}
@Override
public int selectNow() throws IOException {
- throw new NotImplementedException("");
+ // throw new NotImplementedException("");
+ logger.debug("selectNow()");
+ // check if one channel is active
+ return events.size();
}
@Override
public int select(long timeout) throws IOException {
- throw new NotImplementedException("");
+ logger.debug("select({})", timeout);
+ if (events.size() > 0) {
+ return events.size();
+ }
+ this.selectPromise = new DefaultPromise<>(executor);
+ try {
+ if (selectPromise.await(timeout)) {
+ logger.debug("Promise was cancelled, new Events should be
there.");
+ } else {
+ logger.debug("Promise timed out, expecting no new events.");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Was interrupted", e);
+ }
+ return events.size();
}
@Override
@@ -73,10 +131,28 @@ class SerialPollingSelector extends AbstractSelector {
@Override
public Selector wakeup() {
- // TODO do we have to do something here?
+ // throw new NotImplementedException("Not implemented for this
selector, should not be needed.");
+ // NOOP
return this;
}
+ public void addEvent(SelectorEvent event) {
+ logger.debug("Adding Event to Selector, canceling Promise...");
+ this.events.add(event);
+ // Add the OP to the SelectionKey
+ ((SerialSelectionKey) event.key).addReadyOp(event.event);
+ // Close the future so that the select is fired imediatly
+ if (!selectPromise.isDone()) {
+ selectPromise.setSuccess(null);
+ } else {
+ logger.debug("Promise is already cancelled, skipping that.");
+ }
+ }
+
+ public void removeEvent(SerialSelectionKey serialSelectionKey) {
+ events.removeIf(event -> event.key.equals(serialSelectionKey));
+ }
+
@Override
protected void implCloseSelector() throws IOException {
// TODO should we do something here?
@@ -88,7 +164,16 @@ class SerialPollingSelector extends AbstractSelector {
if (!(ch instanceof SerialSocketChannel)) {
throw new IllegalArgumentException("Given channel has to be of
type " + SerialSocketChannel.class);
}
- throw new NotImplementedException("");
+ final SerialSelectionKey key = new SerialSelectionKey(ch, this, ops);
+ // Attach attr
+ key.attach(att);
+ synchronized (this) {
+ // TODO is this always the case??
+ final int index = registeredChannels.size();
+ registeredChannels.add(key);
+ key.setIndex(index);
+ }
+ return key;
}
}
diff --git
a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSelectionKey.java
b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSelectionKey.java
new file mode 100644
index 0000000..1e64b74
--- /dev/null
+++
b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSelectionKey.java
@@ -0,0 +1,62 @@
+package org.apache.plc4x.java.base.connection;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.AbstractSelectionKey;
+
+class SerialSelectionKey extends AbstractSelectionKey {
+
+ private static final Logger logger =
LoggerFactory.getLogger(SerialSelectionKey.class);
+
+ final SelectableChannel channel;
+ final Selector selector;
+ int index;
+ private volatile int interestOps;
+ private int readyOps;
+
+ SerialSelectionKey(SelectableChannel channel, Selector selector, int
interestOps) {
+ this.channel = channel;
+ this.selector = selector;
+ this.interestOps = interestOps;
+ }
+
+ public void setIndex(int index) {
+ this.index = index;
+ }
+
+ @Override public SelectableChannel channel() {
+ return this.channel;
+ }
+
+ @Override public Selector selector() {
+ return this.selector;
+ }
+
+ @Override public int interestOps() {
+ return this.interestOps;
+ }
+
+ @Override public SelectionKey interestOps(int ops) {
+ this.interestOps = ops;
+ return this;
+ }
+
+ @Override public int readyOps() {
+ final int ops = this.readyOps;
+ this.readyOps = 0;
+ // Reset events for this here in Selector
+ ((SerialPollingSelector) selector).removeEvent(this);
+ logger.debug("Returning ready operation {}", ops);
+ return ops;
+ }
+
+ public void addReadyOp(int event) {
+ readyOps = readyOps | event;
+ logger.debug("Adding event {} to ready ops, now having ready ops {}",
event, readyOps);
+ }
+}
diff --git
a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSocketChannel.java
b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSocketChannel.java
index e1b93bf..3c17605 100644
---
a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSocketChannel.java
+++
b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSocketChannel.java
@@ -19,6 +19,10 @@
package org.apache.plc4x.java.base.connection;
+import org.apache.commons.lang3.NotImplementedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.net.Socket;
import java.net.SocketAddress;
@@ -36,6 +40,10 @@ import java.util.Set;
*/
class SerialSocketChannel extends SocketChannel {
+ private static final Logger logger =
LoggerFactory.getLogger(SerialSocketChannel.class);
+
+ private SerialChannel child = null;
+
/**
* Initializes a new instance of this class.
*
@@ -47,96 +55,104 @@ class SerialSocketChannel extends SocketChannel {
@Override
public SocketChannel bind(SocketAddress local) throws IOException {
- return null;
+ throw new NotImplementedException("");
}
@Override
public <T> SocketChannel setOption(SocketOption<T> name, T value) throws
IOException {
- return null;
+ throw new NotImplementedException("");
}
@Override
public SocketChannel shutdownInput() throws IOException {
- return null;
+ throw new NotImplementedException("");
}
@Override
public SocketChannel shutdownOutput() throws IOException {
- return null;
+ throw new NotImplementedException("");
}
@Override
public Socket socket() {
- return null;
+ throw new NotImplementedException("");
}
@Override
public boolean isConnected() {
- return false;
+ throw new NotImplementedException("");
}
@Override
public boolean isConnectionPending() {
- return false;
+ throw new NotImplementedException("");
}
@Override
public boolean connect(SocketAddress remote) throws IOException {
- return false;
+ throw new NotImplementedException("");
}
@Override
public boolean finishConnect() throws IOException {
- return false;
+ throw new NotImplementedException("");
}
@Override
public SocketAddress getRemoteAddress() throws IOException {
- return null;
+ throw new NotImplementedException("");
}
@Override
public int read(ByteBuffer dst) throws IOException {
- return 0;
+ throw new NotImplementedException("");
}
@Override
public long read(ByteBuffer[] dsts, int offset, int length) throws
IOException {
- return 0;
+ throw new NotImplementedException("");
}
@Override
public int write(ByteBuffer src) throws IOException {
- return 0;
+ throw new NotImplementedException("");
}
@Override
public long write(ByteBuffer[] srcs, int offset, int length) throws
IOException {
- return 0;
+ throw new NotImplementedException("");
}
@Override
public SocketAddress getLocalAddress() throws IOException {
- return null;
+ throw new NotImplementedException("");
}
@Override
public <T> T getOption(SocketOption<T> name) throws IOException {
- return null;
+ throw new NotImplementedException("");
}
@Override
public Set<SocketOption<?>> supportedOptions() {
- return null;
+ throw new NotImplementedException("");
}
@Override
protected void implCloseSelectableChannel() throws IOException {
-
+ throw new NotImplementedException("");
}
@Override
protected void implConfigureBlocking(boolean block) throws IOException {
+ logger.debug("Requesting Blocking mode to '{}'", block ? "blocking" :
"non blocking");
+ }
+
+ public SerialChannel getChild() {
+ return child;
+ }
+ public void setChild(SerialChannel child) {
+ this.child = child;
}
}
diff --git
a/plc4j/protocols/driver-bases/serial/src/test/java/org/apache/plc4x/java/base/connection/SerialChannelFactoryTest.java
b/plc4j/protocols/driver-bases/serial/src/test/java/org/apache/plc4x/java/base/connection/SerialChannelFactoryTest.java
index b2aa31b..b333cda 100644
---
a/plc4j/protocols/driver-bases/serial/src/test/java/org/apache/plc4x/java/base/connection/SerialChannelFactoryTest.java
+++
b/plc4j/protocols/driver-bases/serial/src/test/java/org/apache/plc4x/java/base/connection/SerialChannelFactoryTest.java
@@ -19,12 +19,21 @@
package org.apache.plc4x.java.base.connection;
-import io.netty.channel.ChannelHandler;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.ByteToMessageCodec;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import static org.junit.Assert.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
/**
* TODO write comment
@@ -34,24 +43,39 @@ import static org.junit.Assert.*;
*/
public class SerialChannelFactoryTest {
+ private static final Logger logger =
LoggerFactory.getLogger(SerialChannelFactoryTest.class);
+
@Test
- public void createChannel() throws PlcConnectionException {
+ public void createChannel() throws PlcConnectionException,
InterruptedException, UnknownHostException {
SerialChannelFactory asdf = new SerialChannelFactory("asdf");
- asdf.createChannel(new ChannelHandler() {
- @Override
- public void handlerAdded(ChannelHandlerContext ctx) throws
Exception {
-
- }
-
- @Override
- public void handlerRemoved(ChannelHandlerContext ctx) throws
Exception {
-
+ final TcpSocketChannelFactory factory = new
TcpSocketChannelFactory(InetAddress.getLocalHost(), 5432);
+ final Channel channel = asdf.createChannel(new
ChannelInitializer<SerialChannel>() {
+ @Override protected void initChannel(SerialChannel ch) throws
Exception {
+ ch.pipeline().addLast(new DemoCodec());
}
+ });
+ Thread.sleep(100);
+ for (int i = 1; i <= 10; i++) {
+ Thread.sleep(10);
+ SerialChannelHandler.DummyHandler.INSTANCE.fireEvent(1);
+ }
+ Thread.sleep(100);
+ channel.close().sync();
+ }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable
cause) throws Exception {
+ private static class DemoCodec extends ByteToMessageCodec<Object> {
+ @Override protected void encode(ChannelHandlerContext
channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
+ // do nothing here
+ }
+ @Override protected void decode(ChannelHandlerContext
channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
+ byteBuf.markReaderIndex();
+ StringBuffer sb = new StringBuffer();
+ for (int i = 1; i <= byteBuf.readableBytes(); i++) {
+ sb.append(byteBuf.readByte() + ", ");
}
- });
+ byteBuf.resetReaderIndex();
+ logger.debug("We currently have {} readable bytes: {}",
byteBuf.readableBytes(), sb.toString());
+ }
}
}
\ No newline at end of file
diff --git a/src/site/asciidoc/developers/writing-driver/writing-driver.adoc
b/src/site/asciidoc/developers/writing-driver/writing-driver.adoc
new file mode 100644
index 0000000..f071fac
--- /dev/null
+++ b/src/site/asciidoc/developers/writing-driver/writing-driver.adoc
@@ -0,0 +1,57 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+:imagesdir: ../../images/
+
+== Basic Building Blocs of a Driver / Protocol
+
+The general pipeline for a Protocol looks like the following:
+
+[ditaa,driver-anatomy]
+....
++-------------------------------------+
+| Upstream (e.g. PLC4X API :
++-------------------------------------+
+ |
+ :
+ v
++-------------------------------------+
+| PLC4X Optimizier :
+| Possibly collapse requests |
++-------------------------------------+
+ |
+ :
+ v
++-------------------------------------+
+| Protocol L1 |
+| Req. to (generated) Message Objects|
++-------------------------------------+
+ |
+ |
+ v
++-------------------------------------+
+| Protocol L2 |
+| Generated Objects to bytes |
++-------------------------------------+
+ |
+ |
+ v
++-------------------------------------+
+| Downstream (e.g. Socket for I/O) :
++-------------------------------------+
+....
+
+A driver has the task to assemble the pipeline for the protocol(s).
\ No newline at end of file