This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8091bfd692 refactor(plc4x-server): Cleanup, add tests, and add option 
to specify port number (#1162)
8091bfd692 is described below

commit 8091bfd69218eb87665b3d291b12d46c0e5d0782
Author: Rajmund Takács <[email protected]>
AuthorDate: Wed Oct 18 17:56:29 2023 +0200

    refactor(plc4x-server): Cleanup, add tests, and add option to specify port 
number (#1162)
---
 plc4j/tools/plc4x-server/pom.xml                   |   7 +
 .../plc4x/java/tools/plc4xserver/Plc4xServer.java  | 143 ++++++++++++++++-----
 .../java/tools/plc4xserver/Plc4xServerTest.java    | 131 +++++++++++++++++++
 3 files changed, 246 insertions(+), 35 deletions(-)

diff --git a/plc4j/tools/plc4x-server/pom.xml b/plc4j/tools/plc4x-server/pom.xml
index 07ed3d2ebf..f650f1b361 100644
--- a/plc4j/tools/plc4x-server/pom.xml
+++ b/plc4j/tools/plc4x-server/pom.xml
@@ -92,6 +92,13 @@
       <version>0.12.0-SNAPSHOT</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-driver-plc4x</artifactId>
+      <version>0.12.0-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
+
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-buffer</artifactId>
diff --git 
a/plc4j/tools/plc4x-server/src/main/java/org/apache/plc4x/java/tools/plc4xserver/Plc4xServer.java
 
b/plc4j/tools/plc4x-server/src/main/java/org/apache/plc4x/java/tools/plc4xserver/Plc4xServer.java
index fb8539b9cf..e1dadd43c4 100644
--- 
a/plc4j/tools/plc4x-server/src/main/java/org/apache/plc4x/java/tools/plc4xserver/Plc4xServer.java
+++ 
b/plc4j/tools/plc4x-server/src/main/java/org/apache/plc4x/java/tools/plc4xserver/Plc4xServer.java
@@ -16,10 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.plc4x.java.tools.plc4xserver;
 
+import static java.lang.Runtime.getRuntime;
+
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
@@ -27,65 +31,129 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
-import org.apache.plc4x.java.api.exceptions.PlcException;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.ToIntFunction;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 import org.apache.plc4x.java.plc4x.readwrite.Plc4xConstants;
 import org.apache.plc4x.java.plc4x.readwrite.Plc4xMessage;
 import org.apache.plc4x.java.spi.connection.GeneratedProtocolMessageCodec;
 import org.apache.plc4x.java.spi.generation.ByteOrder;
 import org.apache.plc4x.java.tools.plc4xserver.protocol.Plc4xServerAdapter;
-
-import java.util.function.ToIntFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class Plc4xServer {
 
+    public static final String SERVER_PORT_PROPERTY = "plc4x.server.port";
+    public static final String SERVER_PORT_ENVIRONMENT_VARIABLE = 
"PLC4X_SERVER_PORT";
+    public static int DEFAULT_PORT = Plc4xConstants.PLC4XTCPDEFAULTPORT;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(Plc4xServerAdapter.class);
+
     private EventLoopGroup loopGroup;
     private EventLoopGroup workerGroup;
+    private ChannelFuture channelFuture;
+    private Integer port;
+
+    public static void main(String[] args) throws Exception {
+        final Plc4xServer server = new Plc4xServer();
 
-    public Plc4xServer() {
+        Future<Void> serverFuture = server.start(
+                Arrays.stream(args).findFirst() // port number given as first 
command line argument
+                        .or(() -> 
Optional.ofNullable(System.getProperty(SERVER_PORT_PROPERTY)))
+                        .or(() -> 
Optional.ofNullable(System.getenv(SERVER_PORT_ENVIRONMENT_VARIABLE)))
+                        .map(Integer::parseInt)
+                        .orElse(DEFAULT_PORT)
+        );
+        CompletableFuture<Void> serverRunning = new CompletableFuture<>();
+        getRuntime().addShutdownHook(new Thread(() -> 
serverRunning.complete(null)));
+
+        try {
+            LOG.info("Server is configured to listen on TCP port {}", 
server.getPort());
+            serverFuture.get();
+            LOG.info("Server is ready.");
+            serverRunning.get();
+        } catch (InterruptedException e) {
+            throw new PlcRuntimeException(e);
+        } finally {
+            LOG.info("Server is shutting down...");
+            server.stop();
+        }
     }
 
-    public void start() throws PlcException {
-        if(loopGroup != null) {
-            return;
+    public Integer getPort() {
+        return port;
+    }
+
+    public Future<Void> start() {
+        return start(0);
+    }
+
+    public Future<Void> start(int port) {
+        if (port == 0) {
+            this.port = findRandomFreePort();
+        } else {
+            this.port = port;
         }
 
-        try {
-            loopGroup = new NioEventLoopGroup();
-            workerGroup = new NioEventLoopGroup();
+        if (loopGroup != null) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        loopGroup = new NioEventLoopGroup();
+        workerGroup = new NioEventLoopGroup();
 
-            ServerBootstrap bootstrap = new ServerBootstrap();
-            bootstrap.group(loopGroup, workerGroup)
+        channelFuture = new ServerBootstrap()
+                .group(loopGroup, workerGroup)
                 .channel(NioServerSocketChannel.class)
-                .childHandler(new ChannelInitializer<SocketChannel>() {
-                    @Override
-                    public void initChannel(SocketChannel channel) {
-                        ChannelPipeline pipeline = channel.pipeline();
-                        pipeline.addLast(new GeneratedProtocolMessageCodec<>(
-                            Plc4xMessage.class, Plc4xMessage::staticParse, 
ByteOrder.BIG_ENDIAN, null,
-                            new ByteLengthEstimator(), null));
-                        pipeline.addLast(new Plc4xServerAdapter());
-                    }
-                }).option(ChannelOption.SO_BACKLOG, 128)
-                .childOption(ChannelOption.SO_KEEPALIVE, true);
-
-            bootstrap.bind(Plc4xConstants.PLC4XTCPDEFAULTPORT).sync();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PlcException(e);
-        }
+                .childHandler(new SocketChannelChannelInitializer())
+                .option(ChannelOption.SO_BACKLOG, 128)
+                .childOption(ChannelOption.SO_KEEPALIVE, true)
+                .bind(this.port);
+
+        return channelFuture;
     }
 
     public void stop() {
-        if(workerGroup == null) {
+        if (workerGroup == null) {
             return;
         }
 
+        channelFuture.cancel(true);
+
         workerGroup.shutdownGracefully();
         loopGroup.shutdownGracefully();
+
+        workerGroup = null;
+        loopGroup = null;
     }
 
-    /** Estimate the Length of a Packet */
-    public static class ByteLengthEstimator implements ToIntFunction<ByteBuf> {
+    private static class SocketChannelChannelInitializer extends 
ChannelInitializer<SocketChannel> {
+
+        @Override
+        public void initChannel(SocketChannel channel) {
+            ChannelPipeline pipeline = channel.pipeline();
+            pipeline.addLast(
+                    new GeneratedProtocolMessageCodec<>(
+                            Plc4xMessage.class,
+                            Plc4xMessage::staticParse,
+                            ByteOrder.BIG_ENDIAN,
+                            null,
+                            new ByteLengthEstimator(),
+                            null
+                    )
+            );
+            pipeline.addLast(new Plc4xServerAdapter());
+        }
+    }
+
+    private static class ByteLengthEstimator implements ToIntFunction<ByteBuf> 
{
+
         @Override
         public int applyAsInt(ByteBuf byteBuf) {
             if (byteBuf.readableBytes() >= 3) {
@@ -95,8 +163,13 @@ public class Plc4xServer {
         }
     }
 
-    public static void main(String[] args) throws Exception {
-        new Plc4xServer().start();
+    private static int findRandomFreePort() {
+        final int port;
+        try (ServerSocket socket = new ServerSocket(0)) {
+            port = socket.getLocalPort();
+        } catch (IOException e) {
+            throw new RuntimeException("Couldn't find any free port.", e);
+        }
+        return port;
     }
-
 }
diff --git 
a/plc4j/tools/plc4x-server/src/test/java/org/apache/plc4x/java/tools/plc4xserver/Plc4xServerTest.java
 
b/plc4j/tools/plc4x-server/src/test/java/org/apache/plc4x/java/tools/plc4xserver/Plc4xServerTest.java
new file mode 100644
index 0000000000..83f53bb802
--- /dev/null
+++ 
b/plc4j/tools/plc4x-server/src/test/java/org/apache/plc4x/java/tools/plc4xserver/Plc4xServerTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.tools.plc4xserver;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.plc4x.java.DefaultPlcDriverManager;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.PlcConnectionManager;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class Plc4xServerTest {
+
+    private static final Plc4xServer SERVER = new Plc4xServer();
+    private static final String CONNECTION_STRING_TEMPLATE = 
"plc4x://localhost:%d?remote-connection-string=%s";
+    private static final String CONNECTION_STRING_SIMULATED_ENCODED = 
"simulated%3A%2F%2Flocalhost";
+    private static final long TIMEOUT_VALUE = 10;
+    private static final TimeUnit TIMEOUT_UNIT = TimeUnit.SECONDS;
+
+    private final PlcConnectionManager connectionManager = new 
DefaultPlcDriverManager();
+
+    @BeforeAll
+    public static void setUp() throws ExecutionException, 
InterruptedException, TimeoutException {
+        SERVER.start().get(TIMEOUT_VALUE, TIMEOUT_UNIT);
+    }
+
+    @AfterAll
+    public static void tearDown() {
+        SERVER.stop();
+    }
+
+    @Test
+    public void testWrite() throws Exception {
+        final PlcWriteResponse response;
+
+        try (PlcConnection connection = connectionManager.getConnection(
+                String.format(CONNECTION_STRING_TEMPLATE, SERVER.getPort(), 
CONNECTION_STRING_SIMULATED_ENCODED))) {
+            final PlcWriteRequest request = connection.writeRequestBuilder()
+                    .addTagAddress(
+                            "foo",
+                            "STATE/foo:DINT",
+                            42
+                    )
+                    .build();
+            response = request.execute().get(TIMEOUT_VALUE, TIMEOUT_UNIT);
+        }
+
+        assertEquals(PlcResponseCode.OK, response.getResponseCode("foo"));
+    }
+
+    @Test
+    public void testRead() throws Exception {
+        final PlcReadResponse response;
+
+        try (PlcConnection connection = connectionManager.getConnection(
+                String.format(CONNECTION_STRING_TEMPLATE, SERVER.getPort(), 
CONNECTION_STRING_SIMULATED_ENCODED))) {
+            final PlcReadRequest request = connection.readRequestBuilder()
+                    .addTagAddress(
+                            "foo",
+                            "RANDOM/foo:DINT"
+                    )
+                    .build();
+            response = request.execute().get(TIMEOUT_VALUE, TIMEOUT_UNIT);
+        }
+
+        assertEquals(PlcResponseCode.OK, response.getResponseCode("foo"));
+        assertNotNull(response.getPlcValue("foo"));
+        assertInstanceOf(Integer.class, 
response.getPlcValue("foo").getObject());
+    }
+
+    @Test
+    public void testReadWriteSameConnection() throws Exception {
+        final PlcWriteResponse writeResponse;
+        final PlcReadResponse readResponse;
+
+        try (PlcConnection connection = connectionManager.getConnection(
+                String.format(CONNECTION_STRING_TEMPLATE, SERVER.getPort(), 
CONNECTION_STRING_SIMULATED_ENCODED))) {
+            final PlcWriteRequest writeRequest = 
connection.writeRequestBuilder()
+                    .addTagAddress(
+                            "foo",
+                            "STATE/foo:DINT",
+                            21
+                    )
+                    .build();
+            writeResponse = writeRequest.execute().get(TIMEOUT_VALUE, 
TIMEOUT_UNIT);
+
+            final PlcReadRequest readRequest = connection.readRequestBuilder()
+                    .addTagAddress(
+                            "foo",
+                            "STATE/foo:DINT"
+                    )
+                    .build();
+            readResponse = readRequest.execute().get(TIMEOUT_VALUE, 
TIMEOUT_UNIT);
+        }
+
+        assertEquals(PlcResponseCode.OK, writeResponse.getResponseCode("foo"));
+        assertEquals(PlcResponseCode.OK, readResponse.getResponseCode("foo"));
+
+        assertInstanceOf(Integer.class, 
readResponse.getPlcValue("foo").getObject());
+        assertEquals(21, readResponse.getInteger("foo"));
+    }
+}

Reply via email to