This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 8091bfd692 refactor(plc4x-server): Cleanup, add tests, and add option
to specify port number (#1162)
8091bfd692 is described below
commit 8091bfd69218eb87665b3d291b12d46c0e5d0782
Author: Rajmund Takács <[email protected]>
AuthorDate: Wed Oct 18 17:56:29 2023 +0200
refactor(plc4x-server): Cleanup, add tests, and add option to specify port
number (#1162)
---
plc4j/tools/plc4x-server/pom.xml | 7 +
.../plc4x/java/tools/plc4xserver/Plc4xServer.java | 143 ++++++++++++++++-----
.../java/tools/plc4xserver/Plc4xServerTest.java | 131 +++++++++++++++++++
3 files changed, 246 insertions(+), 35 deletions(-)
diff --git a/plc4j/tools/plc4x-server/pom.xml b/plc4j/tools/plc4x-server/pom.xml
index 07ed3d2ebf..f650f1b361 100644
--- a/plc4j/tools/plc4x-server/pom.xml
+++ b/plc4j/tools/plc4x-server/pom.xml
@@ -92,6 +92,13 @@
<version>0.12.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.plc4x</groupId>
+ <artifactId>plc4j-driver-plc4x</artifactId>
+ <version>0.12.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
diff --git
a/plc4j/tools/plc4x-server/src/main/java/org/apache/plc4x/java/tools/plc4xserver/Plc4xServer.java
b/plc4j/tools/plc4x-server/src/main/java/org/apache/plc4x/java/tools/plc4xserver/Plc4xServer.java
index fb8539b9cf..e1dadd43c4 100644
---
a/plc4j/tools/plc4x-server/src/main/java/org/apache/plc4x/java/tools/plc4xserver/Plc4xServer.java
+++
b/plc4j/tools/plc4x-server/src/main/java/org/apache/plc4x/java/tools/plc4xserver/Plc4xServer.java
@@ -16,10 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.plc4x.java.tools.plc4xserver;
+import static java.lang.Runtime.getRuntime;
+
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
@@ -27,65 +31,129 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
-import org.apache.plc4x.java.api.exceptions.PlcException;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.ToIntFunction;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.plc4x.readwrite.Plc4xConstants;
import org.apache.plc4x.java.plc4x.readwrite.Plc4xMessage;
import org.apache.plc4x.java.spi.connection.GeneratedProtocolMessageCodec;
import org.apache.plc4x.java.spi.generation.ByteOrder;
import org.apache.plc4x.java.tools.plc4xserver.protocol.Plc4xServerAdapter;
-
-import java.util.function.ToIntFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class Plc4xServer {
+ public static final String SERVER_PORT_PROPERTY = "plc4x.server.port";
+ public static final String SERVER_PORT_ENVIRONMENT_VARIABLE =
"PLC4X_SERVER_PORT";
+ public static int DEFAULT_PORT = Plc4xConstants.PLC4XTCPDEFAULTPORT;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(Plc4xServerAdapter.class);
+
private EventLoopGroup loopGroup;
private EventLoopGroup workerGroup;
+ private ChannelFuture channelFuture;
+ private Integer port;
+
+ public static void main(String[] args) throws Exception {
+ final Plc4xServer server = new Plc4xServer();
- public Plc4xServer() {
+ Future<Void> serverFuture = server.start(
+ Arrays.stream(args).findFirst() // port number given as first
command line argument
+ .or(() ->
Optional.ofNullable(System.getProperty(SERVER_PORT_PROPERTY)))
+ .or(() ->
Optional.ofNullable(System.getenv(SERVER_PORT_ENVIRONMENT_VARIABLE)))
+ .map(Integer::parseInt)
+ .orElse(DEFAULT_PORT)
+ );
+ CompletableFuture<Void> serverRunning = new CompletableFuture<>();
+ getRuntime().addShutdownHook(new Thread(() ->
serverRunning.complete(null)));
+
+ try {
+ LOG.info("Server is configured to listen on TCP port {}",
server.getPort());
+ serverFuture.get();
+ LOG.info("Server is ready.");
+ serverRunning.get();
+ } catch (InterruptedException e) {
+ throw new PlcRuntimeException(e);
+ } finally {
+ LOG.info("Server is shutting down...");
+ server.stop();
+ }
}
- public void start() throws PlcException {
- if(loopGroup != null) {
- return;
+ public Integer getPort() {
+ return port;
+ }
+
+ public Future<Void> start() {
+ return start(0);
+ }
+
+ public Future<Void> start(int port) {
+ if (port == 0) {
+ this.port = findRandomFreePort();
+ } else {
+ this.port = port;
}
- try {
- loopGroup = new NioEventLoopGroup();
- workerGroup = new NioEventLoopGroup();
+ if (loopGroup != null) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ loopGroup = new NioEventLoopGroup();
+ workerGroup = new NioEventLoopGroup();
- ServerBootstrap bootstrap = new ServerBootstrap();
- bootstrap.group(loopGroup, workerGroup)
+ channelFuture = new ServerBootstrap()
+ .group(loopGroup, workerGroup)
.channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel channel) {
- ChannelPipeline pipeline = channel.pipeline();
- pipeline.addLast(new GeneratedProtocolMessageCodec<>(
- Plc4xMessage.class, Plc4xMessage::staticParse,
ByteOrder.BIG_ENDIAN, null,
- new ByteLengthEstimator(), null));
- pipeline.addLast(new Plc4xServerAdapter());
- }
- }).option(ChannelOption.SO_BACKLOG, 128)
- .childOption(ChannelOption.SO_KEEPALIVE, true);
-
- bootstrap.bind(Plc4xConstants.PLC4XTCPDEFAULTPORT).sync();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PlcException(e);
- }
+ .childHandler(new SocketChannelChannelInitializer())
+ .option(ChannelOption.SO_BACKLOG, 128)
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ .bind(this.port);
+
+ return channelFuture;
}
public void stop() {
- if(workerGroup == null) {
+ if (workerGroup == null) {
return;
}
+ channelFuture.cancel(true);
+
workerGroup.shutdownGracefully();
loopGroup.shutdownGracefully();
+
+ workerGroup = null;
+ loopGroup = null;
}
- /** Estimate the Length of a Packet */
- public static class ByteLengthEstimator implements ToIntFunction<ByteBuf> {
+ private static class SocketChannelChannelInitializer extends
ChannelInitializer<SocketChannel> {
+
+ @Override
+ public void initChannel(SocketChannel channel) {
+ ChannelPipeline pipeline = channel.pipeline();
+ pipeline.addLast(
+ new GeneratedProtocolMessageCodec<>(
+ Plc4xMessage.class,
+ Plc4xMessage::staticParse,
+ ByteOrder.BIG_ENDIAN,
+ null,
+ new ByteLengthEstimator(),
+ null
+ )
+ );
+ pipeline.addLast(new Plc4xServerAdapter());
+ }
+ }
+
+ private static class ByteLengthEstimator implements ToIntFunction<ByteBuf>
{
+
@Override
public int applyAsInt(ByteBuf byteBuf) {
if (byteBuf.readableBytes() >= 3) {
@@ -95,8 +163,13 @@ public class Plc4xServer {
}
}
- public static void main(String[] args) throws Exception {
- new Plc4xServer().start();
+ private static int findRandomFreePort() {
+ final int port;
+ try (ServerSocket socket = new ServerSocket(0)) {
+ port = socket.getLocalPort();
+ } catch (IOException e) {
+ throw new RuntimeException("Couldn't find any free port.", e);
+ }
+ return port;
}
-
}
diff --git
a/plc4j/tools/plc4x-server/src/test/java/org/apache/plc4x/java/tools/plc4xserver/Plc4xServerTest.java
b/plc4j/tools/plc4x-server/src/test/java/org/apache/plc4x/java/tools/plc4xserver/Plc4xServerTest.java
new file mode 100644
index 0000000000..83f53bb802
--- /dev/null
+++
b/plc4j/tools/plc4x-server/src/test/java/org/apache/plc4x/java/tools/plc4xserver/Plc4xServerTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.tools.plc4xserver;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.plc4x.java.DefaultPlcDriverManager;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.PlcConnectionManager;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class Plc4xServerTest {
+
+ private static final Plc4xServer SERVER = new Plc4xServer();
+ private static final String CONNECTION_STRING_TEMPLATE =
"plc4x://localhost:%d?remote-connection-string=%s";
+ private static final String CONNECTION_STRING_SIMULATED_ENCODED =
"simulated%3A%2F%2Flocalhost";
+ private static final long TIMEOUT_VALUE = 10;
+ private static final TimeUnit TIMEOUT_UNIT = TimeUnit.SECONDS;
+
+ private final PlcConnectionManager connectionManager = new
DefaultPlcDriverManager();
+
+ @BeforeAll
+ public static void setUp() throws ExecutionException,
InterruptedException, TimeoutException {
+ SERVER.start().get(TIMEOUT_VALUE, TIMEOUT_UNIT);
+ }
+
+ @AfterAll
+ public static void tearDown() {
+ SERVER.stop();
+ }
+
+ @Test
+ public void testWrite() throws Exception {
+ final PlcWriteResponse response;
+
+ try (PlcConnection connection = connectionManager.getConnection(
+ String.format(CONNECTION_STRING_TEMPLATE, SERVER.getPort(),
CONNECTION_STRING_SIMULATED_ENCODED))) {
+ final PlcWriteRequest request = connection.writeRequestBuilder()
+ .addTagAddress(
+ "foo",
+ "STATE/foo:DINT",
+ 42
+ )
+ .build();
+ response = request.execute().get(TIMEOUT_VALUE, TIMEOUT_UNIT);
+ }
+
+ assertEquals(PlcResponseCode.OK, response.getResponseCode("foo"));
+ }
+
+ @Test
+ public void testRead() throws Exception {
+ final PlcReadResponse response;
+
+ try (PlcConnection connection = connectionManager.getConnection(
+ String.format(CONNECTION_STRING_TEMPLATE, SERVER.getPort(),
CONNECTION_STRING_SIMULATED_ENCODED))) {
+ final PlcReadRequest request = connection.readRequestBuilder()
+ .addTagAddress(
+ "foo",
+ "RANDOM/foo:DINT"
+ )
+ .build();
+ response = request.execute().get(TIMEOUT_VALUE, TIMEOUT_UNIT);
+ }
+
+ assertEquals(PlcResponseCode.OK, response.getResponseCode("foo"));
+ assertNotNull(response.getPlcValue("foo"));
+ assertInstanceOf(Integer.class,
response.getPlcValue("foo").getObject());
+ }
+
+ @Test
+ public void testReadWriteSameConnection() throws Exception {
+ final PlcWriteResponse writeResponse;
+ final PlcReadResponse readResponse;
+
+ try (PlcConnection connection = connectionManager.getConnection(
+ String.format(CONNECTION_STRING_TEMPLATE, SERVER.getPort(),
CONNECTION_STRING_SIMULATED_ENCODED))) {
+ final PlcWriteRequest writeRequest =
connection.writeRequestBuilder()
+ .addTagAddress(
+ "foo",
+ "STATE/foo:DINT",
+ 21
+ )
+ .build();
+ writeResponse = writeRequest.execute().get(TIMEOUT_VALUE,
TIMEOUT_UNIT);
+
+ final PlcReadRequest readRequest = connection.readRequestBuilder()
+ .addTagAddress(
+ "foo",
+ "STATE/foo:DINT"
+ )
+ .build();
+ readResponse = readRequest.execute().get(TIMEOUT_VALUE,
TIMEOUT_UNIT);
+ }
+
+ assertEquals(PlcResponseCode.OK, writeResponse.getResponseCode("foo"));
+ assertEquals(PlcResponseCode.OK, readResponse.getResponseCode("foo"));
+
+ assertInstanceOf(Integer.class,
readResponse.getPlcValue("foo").getObject());
+ assertEquals(21, readResponse.getInteger("foo"));
+ }
+}