This is an automated email from the ASF dual-hosted git repository. cdutz pushed a commit to branch detached in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit ec120effb5e53bf50eb01a831f9ec06dddf448f5 Author: Christofer Dutz <[email protected]> AuthorDate: Mon Feb 2 10:08:54 2026 +0100 feat: Added the API module for the new transports. --- plc4j/transports/api/pom.xml | 87 ++++ .../transports/api/AsyncTransportInstance.java | 77 +++ .../java/transports/api/BaseTransportInstance.java | 47 ++ .../transports/api/BlockingTransportInstance.java | 68 +++ .../transports/api/DefaultTransportManager.java | 64 +++ .../java/transports/api/OsgiTransportManager.java | 65 +++ .../plc4x/java/transports/api/RingBuffer.java | 378 ++++++++++++++ .../plc4x/java/transports/api/Transport.java | 58 +++ .../java/transports/api/TransportInstance.java | 85 +++ .../java/transports/api/TransportManager.java | 28 + .../api/config/TransportConfiguration.java | 25 + .../api/exceptions/TransportException.java | 34 ++ .../transports/api/AsyncTransportInstanceTest.java | 265 ++++++++++ .../transports/api/BaseTransportInstanceTest.java | 167 ++++++ .../api/BlockingTransportInstanceTest.java | 179 +++++++ .../api/DefaultTransportManagerTest.java | 84 +++ .../transports/api/OsgiTransportManagerTest.java | 111 ++++ .../plc4x/java/transports/api/RingBufferTest.java | 568 +++++++++++++++++++++ .../plc4x/java/transports/api/TransportTest.java | 104 ++++ .../api/exceptions/TransportExceptionTest.java | 62 +++ plc4j/transports/pom.xml | 1 + pom.xml | 7 + 22 files changed, 2564 insertions(+) diff --git a/plc4j/transports/api/pom.xml b/plc4j/transports/api/pom.xml new file mode 100644 index 0000000000..683b87113d --- /dev/null +++ b/plc4j/transports/api/pom.xml @@ -0,0 +1,87 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.plc4x</groupId> + <artifactId>plc4j-transports</artifactId> + <version>0.14.0-SNAPSHOT</version> + </parent> + + <artifactId>plc4j-transports-api</artifactId> + + <name>PLC4J: Transports: API</name> + + <properties> + <project.build.outputTimestamp>2024-02-16T14:53:02Z</project.build.outputTimestamp> + </properties> + + <dependencies> + <!-- PLC4J API --> + <dependency> + <groupId>org.apache.plc4x</groupId> + <artifactId>plc4j-api</artifactId> + <version>0.14.0-SNAPSHOT</version> + </dependency> + + <dependency> + <groupId>org.apache.plc4x</groupId> + <artifactId>plc4j-spi-config</artifactId> + <version>0.14.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.plc4x</groupId> + <artifactId>plc4j-utils-audit-log-api</artifactId> + <version>0.14.0-SNAPSHOT</version> + </dependency> + + <!-- Logging --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <!-- OSGi Dependencies --> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.service.component.annotations</artifactId> + </dependency> + + <!-- Test Dependencies --> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> diff --git a/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/AsyncTransportInstance.java b/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/AsyncTransportInstance.java new file mode 100644 index 0000000000..28b4bd455c --- /dev/null +++ b/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/AsyncTransportInstance.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.transports.api; + +import org.apache.plc4x.java.transports.api.config.TransportConfiguration; + +import java.util.function.Consumer; + +/** + * Extension of TransportInstance that supports asynchronous, event-driven I/O. + * This eliminates the need for polling loops. + * <p> + * Implementations register with the underlying I/O mechanism (e.g., NIO Selector) + * and notify registered listeners when data becomes available, enabling zero-polling, + * event-driven architectures. + * + * @param <T> the configuration type + */ +public interface AsyncTransportInstance<T extends TransportConfiguration> extends TransportInstance<T> { + + /** + * Registers a listener that will be called when data becomes available. + * This allows for event-driven architectures without polling. + * <p> + * The transport implementation is responsible for monitoring the underlying + * I/O channel and invoking this listener when data arrives. + * + * @param listener callback invoked when data is available + */ + void registerDataListener(Runnable listener); + + /** + * Removes the data available listener, stopping event notifications. + */ + void removeDataListener(); + + /** + * Registers a listener that will be called when the transport is disconnected. + * <p> + * This is critical for properly handling connection failures - any pending + * operations waiting for responses should be completed exceptionally when + * the transport dies, rather than waiting for a timeout. + * <p> + * The listener receives the exception that caused the disconnect, or null + * if the disconnect was graceful (e.g., the remote side closed normally). + * + * @param listener callback invoked when the transport disconnects + */ + default void registerDisconnectListener(Consumer<Throwable> listener) { + // Default no-op for backward compatibility + } + + /** + * Removes the disconnect listener. + */ + default void removeDisconnectListener() { + // Default no-op for backward compatibility + } + +} \ No newline at end of file diff --git a/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/BaseTransportInstance.java b/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/BaseTransportInstance.java new file mode 100644 index 0000000000..20a562bf73 --- /dev/null +++ b/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/BaseTransportInstance.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.transports.api; + +import org.apache.plc4x.java.transports.api.config.TransportConfiguration; +import org.apache.plc4x.java.utils.auditlog.api.AuditLog; +import org.apache.plc4x.java.utils.auditlog.api.AuditLogEventType; + +import java.util.Objects; + +public abstract class BaseTransportInstance<T extends TransportConfiguration> implements TransportInstance<T> { + + private final T transportConfig; + private final AuditLog auditLog; + + public BaseTransportInstance(T transportConfig, AuditLog auditLog) { + this.transportConfig = Objects.requireNonNull(transportConfig); + this.auditLog = auditLog; + auditLog.write(AuditLogEventType.SYSTEM, "Creating Transport with config", transportConfig); + } + + public T getConfiguration() { + return transportConfig; + } + + public AuditLog getAuditLog() { + return auditLog; + } + +} diff --git a/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/BlockingTransportInstance.java b/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/BlockingTransportInstance.java new file mode 100644 index 0000000000..b2075982cf --- /dev/null +++ b/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/BlockingTransportInstance.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.transports.api; + +import org.apache.plc4x.java.transports.api.config.TransportConfiguration; +import org.apache.plc4x.java.transports.api.exceptions.TransportException; + +import java.time.Duration; +import java.util.concurrent.TimeoutException; + +/** + * Extension of TransportInstance that supports blocking reads with timeout. + * This allows the receive loop to block until data is available, eliminating + * the need for polling with Thread.sleep(). + * + * @param <T> the configuration type + */ +public interface BlockingTransportInstance<T extends TransportConfiguration> extends TransportInstance<T> { + + /** + * Blocks until at least one byte is available or the timeout expires. + * This method eliminates the need for polling loops with Thread.sleep(). + * + * @param timeout maximum time to wait for data + * @throws TransportException if an I/O error occurs + * @throws TimeoutException if the timeout expires before data is available + * @throws InterruptedException if the thread is interrupted while waiting + */ + void waitForData(Duration timeout) throws TransportException, TimeoutException, InterruptedException; + + /** + * Blocks until the specified number of bytes are available or the timeout expires. + * + * @param numBytes minimum number of bytes to wait for + * @param timeout maximum time to wait + * @throws TransportException if an I/O error occurs + * @throws TimeoutException if the timeout expires + * @throws InterruptedException if the thread is interrupted while waiting + */ + default void waitForBytes(int numBytes, Duration timeout) throws TransportException, TimeoutException, InterruptedException { + long deadline = System.nanoTime() + timeout.toNanos(); + while (getNumBytesAvailable() < numBytes) { + long remaining = deadline - System.nanoTime(); + if (remaining <= 0) { + throw new TimeoutException("Timeout waiting for " + numBytes + " bytes"); + } + waitForData(Duration.ofNanos(remaining)); + } + } + +} \ No newline at end of file diff --git a/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/DefaultTransportManager.java b/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/DefaultTransportManager.java new file mode 100644 index 0000000000..07ac9b9b17 --- /dev/null +++ b/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/DefaultTransportManager.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.transports.api; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.ServiceLoader; + +public class DefaultTransportManager implements TransportManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultTransportManager.class); + + protected final ClassLoader classLoader; + + private final Map<String, Transport> transportMap; + + public DefaultTransportManager() { + this(Thread.currentThread().getContextClassLoader()); + } + + public DefaultTransportManager(ClassLoader classLoader) { + LOGGER.info("Instantiating new Transport Manager with class loader {}", classLoader); + this.classLoader = classLoader; + transportMap = new HashMap<>(); + ServiceLoader<Transport> transportLoader = ServiceLoader.load(Transport.class, classLoader); + LOGGER.info("Registering available transports..."); + for (Transport transport : transportLoader) { + if (transportMap.containsKey(transport.getTransportCode())) { + throw new IllegalStateException( + "Multiple transport implementations available for transport code '" + + transport.getTransportName() + "'"); + } + LOGGER.info("Registering transport {} ({})", transport.getTransportCode(), transport.getTransportName()); + transportMap.put(transport.getTransportCode(), transport); + } + } + + @Override + public Optional<Transport> getTransport(String transportCode) { + return Optional.ofNullable(transportMap.get(transportCode)); + } + +} diff --git a/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/OsgiTransportManager.java b/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/OsgiTransportManager.java new file mode 100644 index 0000000000..efbce07629 --- /dev/null +++ b/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/OsgiTransportManager.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.transports.api; + +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Reference; +import org.osgi.service.component.annotations.ReferenceCardinality; +import org.osgi.service.component.annotations.ReferencePolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +@Component(service = TransportManager.class, immediate = true) +public class OsgiTransportManager implements TransportManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(OsgiTransportManager.class); + + private final Map<String, Transport<?>> transportMap = new ConcurrentHashMap<>(); + + @Reference( + cardinality = ReferenceCardinality.MULTIPLE, + policy = ReferencePolicy.DYNAMIC + ) + protected void bindTransport(Transport<?> transport) { + String transportCode = transport.getTransportCode(); + LOGGER.info("Registering transport {} ({})", transportCode, transport.getTransportName()); + + Transport<?> existing = transportMap.putIfAbsent(transportCode, transport); + if (existing != null) { + LOGGER.warn("Transport with code '{}' is already registered. Ignoring duplicate.", transportCode); + } + } + + protected void unbindTransport(Transport<?> transport) { + String transportCode = transport.getTransportCode(); + LOGGER.info("Unregistering transport {} ({})", transportCode, transport.getTransportName()); + transportMap.remove(transportCode); + } + + @Override + public Optional<Transport> getTransport(String transportCode) { + return Optional.ofNullable(transportMap.get(transportCode)); + } + +} diff --git a/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/RingBuffer.java b/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/RingBuffer.java new file mode 100644 index 0000000000..f202cae5f8 --- /dev/null +++ b/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/RingBuffer.java @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.transports.api; + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Thread-safe ring buffer implementation for efficient byte storage and retrieval. + * This implementation uses a circular buffer with separate read and write positions, + * allowing for efficient peek and read operations without data copying. + * <p> + * Key features: + * - Thread-safe operations with fine-grained locking + * - Zero-copy peek operations + * - Automatic wrap-around handling + * - Dynamic capacity management + */ +public class RingBuffer { + + private final byte[] buffer; + private final int capacity; + private final Lock lock = new ReentrantLock(); + + private int readPosition = 0; + private int writePosition = 0; + private int available = 0; + + /** + * Creates a new ring buffer with the specified capacity. + * + * @param capacity the maximum number of bytes the buffer can hold + * @throws IllegalArgumentException if capacity is less than or equal to zero + */ + public RingBuffer(int capacity) { + if (capacity <= 0) { + throw new IllegalArgumentException("Capacity must be greater than zero"); + } + this.capacity = capacity; + this.buffer = new byte[capacity]; + } + + /** + * Returns the number of bytes available for reading. + * + * @return number of bytes available + */ + public int availableForReading() { + lock.lock(); + try { + return available; + } finally { + lock.unlock(); + } + } + + /** + * Returns the number of bytes that can be written to the buffer. + * + * @return number of bytes of free space + */ + public int remainingForWriting() { + lock.lock(); + try { + return capacity - available; + } finally { + lock.unlock(); + } + } + + /** + * Returns the total capacity of the buffer. + * + * @return buffer capacity in bytes + */ + public int capacity() { + return capacity; + } + + /** + * Clears all data from the buffer, resetting read and write positions. + */ + public void clear() { + lock.lock(); + try { + readPosition = 0; + writePosition = 0; + available = 0; + } finally { + lock.unlock(); + } + } + + /** + * Writes bytes to the buffer. + * + * @param data the byte array to write + * @return number of bytes actually written + * @throws IllegalArgumentException if data is null + */ + public int write(byte[] data) { + if (data == null) { + throw new IllegalArgumentException("Data cannot be null"); + } + return write(data, 0, data.length); + } + + /** + * Writes bytes to the buffer from the specified offset and length. + * + * @param data the byte array to write from + * @param offset the offset in the data array to start from + * @param length the number of bytes to write + * @return number of bytes actually written + * @throws IllegalArgumentException if data is null or offset/length are invalid + */ + public int write(byte[] data, int offset, int length) { + if (data == null) { + throw new IllegalArgumentException("Data cannot be null"); + } + if (offset < 0 || length < 0 || offset + length > data.length) { + throw new IllegalArgumentException("Invalid offset or length"); + } + if (length == 0) { + return 0; + } + + lock.lock(); + try { + int bytesToWrite = Math.min(length, capacity - available); + if (bytesToWrite == 0) { + return 0; + } + + // Handle wrap-around + int firstChunk = Math.min(bytesToWrite, capacity - writePosition); + System.arraycopy(data, offset, buffer, writePosition, firstChunk); + + if (firstChunk < bytesToWrite) { + // Wrapped around to the beginning + int secondChunk = bytesToWrite - firstChunk; + System.arraycopy(data, offset + firstChunk, buffer, 0, secondChunk); + writePosition = secondChunk; + } else { + writePosition = (writePosition + firstChunk) % capacity; + } + + available += bytesToWrite; + return bytesToWrite; + + } finally { + lock.unlock(); + } + } + + /** + * Writes bytes from a ByteBuffer to the ring buffer. + * This method avoids creating an intermediate byte array, making it more efficient + * for NIO operations. The ByteBuffer's position will be advanced by the number of + * bytes written. + * + * @param byteBuffer the ByteBuffer to read from (must be in read mode with position at start of data) + * @return number of bytes actually written + * @throws IllegalArgumentException if byteBuffer is null + */ + public int write(java.nio.ByteBuffer byteBuffer) { + if (byteBuffer == null) { + throw new IllegalArgumentException("ByteBuffer cannot be null"); + } + + int length = byteBuffer.remaining(); + if (length == 0) { + return 0; + } + + lock.lock(); + try { + int bytesToWrite = Math.min(length, capacity - available); + if (bytesToWrite == 0) { + return 0; + } + + // Handle wrap-around - write directly from ByteBuffer to internal buffer + int firstChunk = Math.min(bytesToWrite, capacity - writePosition); + byteBuffer.get(buffer, writePosition, firstChunk); + + if (firstChunk < bytesToWrite) { + // Wrapped around to the beginning + int secondChunk = bytesToWrite - firstChunk; + byteBuffer.get(buffer, 0, secondChunk); + writePosition = secondChunk; + } else { + writePosition = (writePosition + firstChunk) % capacity; + } + + available += bytesToWrite; + return bytesToWrite; + + } finally { + lock.unlock(); + } + } + + /** + * Peeks at bytes in the buffer without consuming them. + * Returns at most numBytes, but may return fewer if not enough data is available. + * + * @param numBytes the number of bytes to peek + * @return byte array containing the peeked data (may be smaller than requested) + * @throws IllegalArgumentException if numBytes is negative + */ + public byte[] peek(int numBytes) { + if (numBytes < 0) { + throw new IllegalArgumentException("Number of bytes cannot be negative"); + } + if (numBytes == 0) { + return new byte[0]; + } + + lock.lock(); + try { + int bytesToPeek = Math.min(numBytes, available); + if (bytesToPeek == 0) { + return new byte[0]; + } + + byte[] result = new byte[bytesToPeek]; + + // Handle wrap-around + int firstChunk = Math.min(bytesToPeek, capacity - readPosition); + System.arraycopy(buffer, readPosition, result, 0, firstChunk); + + if (firstChunk < bytesToPeek) { + // Wrapped around to the beginning + int secondChunk = bytesToPeek - firstChunk; + System.arraycopy(buffer, 0, result, firstChunk, secondChunk); + } + + return result; + + } finally { + lock.unlock(); + } + } + + /** + * Reads and consumes bytes from the buffer. + * Returns at most numBytes, but may return fewer if not enough data is available. + * + * @param numBytes the number of bytes to read + * @return byte array containing the read data (may be smaller than requested) + * @throws IllegalArgumentException if numBytes is negative + */ + public byte[] read(int numBytes) { + if (numBytes < 0) { + throw new IllegalArgumentException("Number of bytes cannot be negative"); + } + if (numBytes == 0) { + return new byte[0]; + } + + lock.lock(); + try { + int bytesToRead = Math.min(numBytes, available); + if (bytesToRead == 0) { + return new byte[0]; + } + + byte[] result = new byte[bytesToRead]; + + // Handle wrap-around + int firstChunk = Math.min(bytesToRead, capacity - readPosition); + System.arraycopy(buffer, readPosition, result, 0, firstChunk); + + if (firstChunk < bytesToRead) { + // Wrapped around to the beginning + int secondChunk = bytesToRead - firstChunk; + System.arraycopy(buffer, 0, result, firstChunk, secondChunk); + readPosition = secondChunk; + } else { + readPosition = (readPosition + firstChunk) % capacity; + } + + available -= bytesToRead; + return result; + + } finally { + lock.unlock(); + } + } + + /** + * Skips (discards) up to numBytes from the buffer. + * + * @param numBytes the number of bytes to skip + * @return the actual number of bytes skipped + * @throws IllegalArgumentException if numBytes is negative + */ + public int skip(int numBytes) { + if (numBytes < 0) { + throw new IllegalArgumentException("Number of bytes cannot be negative"); + } + if (numBytes == 0) { + return 0; + } + + lock.lock(); + try { + int bytesToSkip = Math.min(numBytes, available); + if (bytesToSkip == 0) { + return 0; + } + + readPosition = (readPosition + bytesToSkip) % capacity; + available -= bytesToSkip; + return bytesToSkip; + + } finally { + lock.unlock(); + } + } + + /** + * Compacts the buffer by moving all available data to the beginning. + * This is useful for ensuring contiguous space for writing. + * Note: This operation is O(n) and should be used sparingly. + */ + public void compact() { + lock.lock(); + try { + if (available == 0 || readPosition == 0) { + // Nothing to compact or already at the beginning + writePosition = available; + readPosition = 0; + return; + } + + if (readPosition < writePosition) { + // No wrap-around, simple case + if (readPosition > 0) { + System.arraycopy(buffer, readPosition, buffer, 0, available); + } + } else { + // Wrap-around case + byte[] temp = new byte[available]; + int firstChunk = capacity - readPosition; + System.arraycopy(buffer, readPosition, temp, 0, firstChunk); + if (firstChunk < available) { + System.arraycopy(buffer, 0, temp, firstChunk, available - firstChunk); + } + System.arraycopy(temp, 0, buffer, 0, available); + } + + readPosition = 0; + writePosition = available; + + } finally { + lock.unlock(); + } + } +} \ No newline at end of file diff --git a/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/Transport.java b/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/Transport.java new file mode 100644 index 0000000000..89fabf8acd --- /dev/null +++ b/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/Transport.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.transports.api; + +import org.apache.plc4x.java.transports.api.config.TransportConfiguration; +import org.apache.plc4x.java.transports.api.exceptions.TransportException; +import org.apache.plc4x.java.utils.auditlog.api.AuditLog; + +public interface Transport<T extends TransportConfiguration> { + + /** + * @return the code used in a connection string to refer to this transport + */ + String getTransportCode(); + + /** + * @return A human-readable name for this transport + */ + String getTransportName(); + + /** + * Returns the configuration class for this transport. + * This is mainly used for systems where drivers are programmatically included. + * + * @return the configuration class for this transport + */ + default Class<T> getTransportConfigType() { + return null; + } + + /** + * Returns an instance of the current transport for the given configuration. + * + * @param transportUrl the URL of the transport to connect to + * @param configuration configuration for the transport + * @return an instance of the current transport for the given configuration + * @throws TransportException something went wrong + */ + TransportInstance<T> createTransportInstance(String transportUrl, TransportConfiguration configuration, AuditLog auditLog) throws TransportException; + +} diff --git a/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/TransportInstance.java b/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/TransportInstance.java new file mode 100644 index 0000000000..50ca90c55d --- /dev/null +++ b/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/TransportInstance.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.transports.api; + +import org.apache.plc4x.java.transports.api.config.TransportConfiguration; +import org.apache.plc4x.java.transports.api.exceptions.TransportException; + +public interface TransportInstance<T extends TransportConfiguration> { + + /** + * Returns the configuration for this transport instance. + * + * @return the configuration for this transport instance. + */ + T getConfiguration(); + + /** + * Executes a check if the underlying transport is still open. + * + * @return true if the transport is still open, false otherwise. + */ + boolean isOpen(); + + /** + * Returns the number of bytes available for reading. + * + * @return number of bytes available + * @throws TransportException something went wrong + */ + int getNumBytesAvailable() throws TransportException; + + /** + * Returns the given number of bytes from the buffer without moving the read position. + * Some protocols only require a header to know the size of an incoming message, so some drivers will just peek + * the size of the header and try to extract the message size from that. If the required number of bytes is + * available, they will consume these bytes via the read method. + * + * @param numBytes number of bytes to peek + * @return byte array containing the peeked bytes + * @throws TransportException something went wrong + */ + byte[] peekReadableBytes(int numBytes) throws TransportException; + + /** + * Return the given number of bytes and actively mark them as read. + * + * @param numBytes number of bytes to read + * @return byte array containing the read bytes + * @throws TransportException something went wrong + */ + byte[] read(int numBytes) throws TransportException; + + /** + * Simply writes the given bytes to the transport. + * + * @param bytes byte array to write + * @throws TransportException something went wrong + */ + void write(byte[] bytes) throws TransportException; + + /** + * Closes the transport. + * + * @throws TransportException something went wrong + */ + void close() throws TransportException; + +} diff --git a/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/TransportManager.java b/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/TransportManager.java new file mode 100644 index 0000000000..69eb773a02 --- /dev/null +++ b/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/TransportManager.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.transports.api; + +import java.util.Optional; + +public interface TransportManager { + + Optional<Transport> getTransport(String transportCode); + +} diff --git a/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/config/TransportConfiguration.java b/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/config/TransportConfiguration.java new file mode 100644 index 0000000000..c11850697e --- /dev/null +++ b/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/config/TransportConfiguration.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.transports.api.config; + +import org.apache.plc4x.java.spi.config.Configuration; + +public interface TransportConfiguration extends Configuration { +} diff --git a/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/exceptions/TransportException.java b/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/exceptions/TransportException.java new file mode 100644 index 0000000000..8bb5cdacef --- /dev/null +++ b/plc4j/transports/api/src/main/java/org/apache/plc4x/java/transports/api/exceptions/TransportException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.transports.api.exceptions; + +import org.apache.plc4x.java.api.exceptions.PlcException; + +public class TransportException extends PlcException { + + public TransportException(String message) { + super(message); + } + + public TransportException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/plc4j/transports/api/src/test/java/org/apache/plc4x/java/transports/api/AsyncTransportInstanceTest.java b/plc4j/transports/api/src/test/java/org/apache/plc4x/java/transports/api/AsyncTransportInstanceTest.java new file mode 100644 index 0000000000..8c4529ef92 --- /dev/null +++ b/plc4j/transports/api/src/test/java/org/apache/plc4x/java/transports/api/AsyncTransportInstanceTest.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.transports.api; + +import org.apache.plc4x.java.transports.api.config.TransportConfiguration; +import org.apache.plc4x.java.transports.api.exceptions.TransportException; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.*; + +class AsyncTransportInstanceTest { + + @Test + void testDefaultDisconnectListenerMethods_noOp() { + // Create a minimal implementation that uses the default disconnect listener methods + AsyncTransportInstance<TransportConfiguration> instance = new MinimalAsyncTransportInstance(); + + // Default methods should not throw - they are no-ops for backward compatibility + assertDoesNotThrow(() -> instance.registerDisconnectListener(cause -> {})); + assertDoesNotThrow(() -> instance.removeDisconnectListener()); + } + + @Test + void testDefaultDisconnectListenerMethods_withNullListener() { + AsyncTransportInstance<TransportConfiguration> instance = new MinimalAsyncTransportInstance(); + + // Should handle null listener gracefully (no-op implementation) + assertDoesNotThrow(() -> instance.registerDisconnectListener(null)); + } + + @Test + void testCustomDisconnectListenerImplementation() { + // Create an implementation that actually uses the disconnect listener + CustomAsyncTransportInstance instance = new CustomAsyncTransportInstance(); + + AtomicBoolean listenerCalled = new AtomicBoolean(false); + AtomicReference<Throwable> receivedCause = new AtomicReference<>(); + + // Register a listener + instance.registerDisconnectListener(cause -> { + listenerCalled.set(true); + receivedCause.set(cause); + }); + + // Simulate a disconnect + RuntimeException testException = new RuntimeException("Test disconnect"); + instance.simulateDisconnect(testException); + + // Verify the listener was called with the correct cause + assertTrue(listenerCalled.get()); + assertEquals(testException, receivedCause.get()); + } + + @Test + void testCustomDisconnectListenerImplementation_gracefulDisconnect() { + CustomAsyncTransportInstance instance = new CustomAsyncTransportInstance(); + + AtomicBoolean listenerCalled = new AtomicBoolean(false); + // Use a sentinel exception to detect if the listener was called with null + RuntimeException sentinel = new RuntimeException("SENTINEL"); + AtomicReference<Throwable> receivedCause = new AtomicReference<>(sentinel); + + instance.registerDisconnectListener(cause -> { + listenerCalled.set(true); + receivedCause.set(cause); + }); + + // Simulate a graceful disconnect (null cause) + instance.simulateDisconnect(null); + + assertTrue(listenerCalled.get()); + assertNull(receivedCause.get()); + } + + @Test + void testRemoveDisconnectListener() { + CustomAsyncTransportInstance instance = new CustomAsyncTransportInstance(); + + AtomicBoolean listenerCalled = new AtomicBoolean(false); + + instance.registerDisconnectListener(cause -> listenerCalled.set(true)); + instance.removeDisconnectListener(); + + // After removing, disconnect should not call the listener + instance.simulateDisconnect(new RuntimeException("Test")); + + assertFalse(listenerCalled.get()); + } + + @Test + void testDataListenerMethods() { + CustomAsyncTransportInstance instance = new CustomAsyncTransportInstance(); + + AtomicBoolean listenerCalled = new AtomicBoolean(false); + + // Register a data listener + instance.registerDataListener(() -> listenerCalled.set(true)); + + // Simulate data available + instance.simulateDataAvailable(); + + assertTrue(listenerCalled.get()); + } + + @Test + void testRemoveDataListener() { + CustomAsyncTransportInstance instance = new CustomAsyncTransportInstance(); + + AtomicBoolean listenerCalled = new AtomicBoolean(false); + + instance.registerDataListener(() -> listenerCalled.set(true)); + instance.removeDataListener(); + + // After removing, data events should not call the listener + instance.simulateDataAvailable(); + + assertFalse(listenerCalled.get()); + } + + /** + * Minimal implementation that uses the default disconnect listener methods (no-op) + */ + private static class MinimalAsyncTransportInstance implements AsyncTransportInstance<TransportConfiguration> { + private Runnable dataListener; + + @Override + public TransportConfiguration getConfiguration() { + return null; + } + + @Override + public void registerDataListener(Runnable listener) { + this.dataListener = listener; + } + + @Override + public void removeDataListener() { + this.dataListener = null; + } + + // Note: registerDisconnectListener and removeDisconnectListener use default implementations + + @Override + public int getNumBytesAvailable() { + return 0; + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public byte[] peekReadableBytes(int numBytes) throws TransportException { + return new byte[0]; + } + + @Override + public byte[] read(int numBytes) throws TransportException { + return new byte[0]; + } + + @Override + public void write(byte[] bytes) throws TransportException { + } + + @Override + public void close() throws TransportException { + } + } + + /** + * Implementation that provides actual disconnect listener functionality for testing + */ + private static class CustomAsyncTransportInstance implements AsyncTransportInstance<TransportConfiguration> { + private Runnable dataListener; + private Consumer<Throwable> disconnectListener; + + @Override + public TransportConfiguration getConfiguration() { + return null; + } + + @Override + public void registerDataListener(Runnable listener) { + this.dataListener = listener; + } + + @Override + public void removeDataListener() { + this.dataListener = null; + } + + @Override + public void registerDisconnectListener(Consumer<Throwable> listener) { + this.disconnectListener = listener; + } + + @Override + public void removeDisconnectListener() { + this.disconnectListener = null; + } + + public void simulateDataAvailable() { + if (dataListener != null) { + dataListener.run(); + } + } + + public void simulateDisconnect(Throwable cause) { + if (disconnectListener != null) { + disconnectListener.accept(cause); + } + } + + @Override + public int getNumBytesAvailable() { + return 0; + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public byte[] peekReadableBytes(int numBytes) throws TransportException { + return new byte[0]; + } + + @Override + public byte[] read(int numBytes) throws TransportException { + return new byte[0]; + } + + @Override + public void write(byte[] bytes) throws TransportException { + } + + @Override + public void close() throws TransportException { + } + } +} diff --git a/plc4j/transports/api/src/test/java/org/apache/plc4x/java/transports/api/BaseTransportInstanceTest.java b/plc4j/transports/api/src/test/java/org/apache/plc4x/java/transports/api/BaseTransportInstanceTest.java new file mode 100644 index 0000000000..ecded68366 --- /dev/null +++ b/plc4j/transports/api/src/test/java/org/apache/plc4x/java/transports/api/BaseTransportInstanceTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.transports.api; + +import org.apache.plc4x.java.transports.api.config.TransportConfiguration; +import org.apache.plc4x.java.transports.api.exceptions.TransportException; +import org.apache.plc4x.java.utils.auditlog.api.AuditLog; +import org.apache.plc4x.java.utils.auditlog.api.AuditLogEventType; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +class BaseTransportInstanceTest { + + @Test + void testConstructor_shouldStoreConfigAndAuditLog() { + // Arrange + TransportConfiguration config = mock(TransportConfiguration.class); + AuditLog auditLog = mock(AuditLog.class); + when(config.toString()).thenReturn("test-config-string"); + + // Act + TestBaseTransport transport = new TestBaseTransport(config, auditLog); + + // Assert + assertNotNull(transport); + assertEquals(config, transport.getConfiguration()); + assertEquals(auditLog, transport.getAuditLog()); + } + + @Test + void testConstructor_shouldWriteAuditLogEntry() { + // Arrange + TransportConfiguration config = mock(TransportConfiguration.class); + AuditLog auditLog = mock(AuditLog.class); + when(config.toString()).thenReturn("test-config-string"); + + // Act + new TestBaseTransport(config, auditLog); + + // Assert - verify audit log was called with correct parameters + ArgumentCaptor<AuditLogEventType> eventTypeCaptor = ArgumentCaptor.forClass(AuditLogEventType.class); + ArgumentCaptor<String> messageCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<Object> configObjectCaptor = ArgumentCaptor.forClass(Object.class); + + verify(auditLog).write(eventTypeCaptor.capture(), messageCaptor.capture(), configObjectCaptor.capture()); + + assertEquals(AuditLogEventType.SYSTEM, eventTypeCaptor.getValue()); + assertEquals("Creating Transport with config", messageCaptor.getValue()); + assertEquals(config, configObjectCaptor.getValue()); + } + + @Test + void testGetConfiguration_shouldReturnConstructorConfig() { + // Arrange + TransportConfiguration config = mock(TransportConfiguration.class); + AuditLog auditLog = mock(AuditLog.class); + when(config.toString()).thenReturn("test-config"); + + TestBaseTransport transport = new TestBaseTransport(config, auditLog); + + // Act + TransportConfiguration result = transport.getConfiguration(); + + // Assert + assertSame(config, result); + } + + @Test + void testGetAuditLog_shouldReturnConstructorAuditLog() { + // Arrange + TransportConfiguration config = mock(TransportConfiguration.class); + AuditLog auditLog = mock(AuditLog.class); + when(config.toString()).thenReturn("test-config"); + + TestBaseTransport transport = new TestBaseTransport(config, auditLog); + + // Act + AuditLog result = transport.getAuditLog(); + + // Assert + assertSame(auditLog, result); + } + + @Test + void testConstructor_withNullConfig_shouldNotThrow() { + // Arrange + AuditLog auditLog = mock(AuditLog.class); + + // Act & Assert - should handle null config gracefully or throw NPE + // The actual behavior depends on the implementation requirements + assertThrows(NullPointerException.class, () -> new TestBaseTransport(null, auditLog)); + } + + @Test + void testConstructor_withNullAuditLog_shouldThrow() { + // Arrange + TransportConfiguration config = mock(TransportConfiguration.class); + when(config.toString()).thenReturn("test-config"); + + // Act & Assert + assertThrows(NullPointerException.class, () -> new TestBaseTransport(config, null)); + } + + /** + * Concrete test implementation of BaseTransport for testing purposes + */ + private static class TestBaseTransport extends BaseTransportInstance<TransportConfiguration> { + public TestBaseTransport(TransportConfiguration transportConfig, AuditLog auditLog) { + super(transportConfig, auditLog); + } + + @Override + public boolean isOpen() { + // Dummy implementation + return false; + } + + @Override + public int getNumBytesAvailable() throws TransportException { + // Dummy implementation + return 0; + } + + @Override + public byte[] peekReadableBytes(int numBytes) throws TransportException { + // Dummy implementation + return new byte[0]; + } + + @Override + public byte[] read(int numBytes) throws TransportException { + // Dummy implementation + return new byte[0]; + } + + @Override + public void write(byte[] bytes) throws TransportException { + // Dummy implementation + } + + @Override + public void close() throws TransportException { + // Dummy implementation + } + } + +} diff --git a/plc4j/transports/api/src/test/java/org/apache/plc4x/java/transports/api/BlockingTransportInstanceTest.java b/plc4j/transports/api/src/test/java/org/apache/plc4x/java/transports/api/BlockingTransportInstanceTest.java new file mode 100644 index 0000000000..161221f5de --- /dev/null +++ b/plc4j/transports/api/src/test/java/org/apache/plc4x/java/transports/api/BlockingTransportInstanceTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.transports.api; + +import org.apache.plc4x.java.transports.api.config.TransportConfiguration; +import org.apache.plc4x.java.transports.api.exceptions.TransportException; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.*; + +class BlockingTransportInstanceTest { + + @Test + void testWaitForBytes_success() throws Exception { + // Create a test implementation that simulates bytes becoming available + BlockingTransportInstance<TransportConfiguration> instance = new TestBlockingTransportInstance(10); + + // Should complete immediately since 10 bytes are available + assertDoesNotThrow(() -> instance.waitForBytes(5, Duration.ofSeconds(1))); + } + + @Test + void testWaitForBytes_timeout() { + // Create a test implementation with only 5 bytes available + BlockingTransportInstance<TransportConfiguration> instance = new TestBlockingTransportInstance(5); + + // Should timeout waiting for 10 bytes + TimeoutException exception = assertThrows(TimeoutException.class, () -> { + instance.waitForBytes(10, Duration.ofMillis(100)); + }); + + assertTrue(exception.getMessage().contains("Timeout waiting for 10 bytes")); + } + + @Test + void testWaitForBytes_exactMatch() throws Exception { + // Create a test implementation with exactly the number of bytes needed + BlockingTransportInstance<TransportConfiguration> instance = new TestBlockingTransportInstance(10); + + // Should complete immediately + assertDoesNotThrow(() -> instance.waitForBytes(10, Duration.ofSeconds(1))); + } + + @Test + void testWaitForBytes_zeroBytes() throws Exception { + // Create a test implementation with no bytes + BlockingTransportInstance<TransportConfiguration> instance = new TestBlockingTransportInstance(0); + + // Waiting for 0 bytes should succeed immediately + assertDoesNotThrow(() -> instance.waitForBytes(0, Duration.ofMillis(100))); + } + + @Test + void testWaitForBytes_bytesIncreasingGradually() throws Exception { + // Test implementation that simulates bytes becoming available gradually + GradualBytesTransportInstance instance = new GradualBytesTransportInstance(); + + // Should eventually get 10 bytes + assertDoesNotThrow(() -> instance.waitForBytes(10, Duration.ofSeconds(2))); + } + + /** + * Test implementation that always reports a fixed number of available bytes + */ + private static class TestBlockingTransportInstance implements BlockingTransportInstance<TransportConfiguration> { + private final int availableBytes; + + TestBlockingTransportInstance(int availableBytes) { + this.availableBytes = availableBytes; + } + + @Override + public TransportConfiguration getConfiguration() { + return null; + } + + @Override + public void waitForData(Duration timeout) throws TransportException, TimeoutException, InterruptedException { + // Simulate a short wait + Thread.sleep(10); + } + + @Override + public int getNumBytesAvailable() { + return availableBytes; + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public byte[] peekReadableBytes(int numBytes) throws TransportException { + return new byte[0]; + } + + @Override + public byte[] read(int numBytes) throws TransportException { + return new byte[0]; + } + + @Override + public void write(byte[] bytes) throws TransportException { + } + + @Override + public void close() throws TransportException { + } + } + + /** + * Test implementation that simulates bytes becoming available gradually + */ + private static class GradualBytesTransportInstance implements BlockingTransportInstance<TransportConfiguration> { + private final AtomicInteger availableBytes = new AtomicInteger(0); + + @Override + public TransportConfiguration getConfiguration() { + return null; + } + + @Override + public void waitForData(Duration timeout) throws TransportException, TimeoutException, InterruptedException { + // Simulate data arriving gradually + Thread.sleep(50); + availableBytes.addAndGet(3); + } + + @Override + public int getNumBytesAvailable() { + return availableBytes.get(); + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public byte[] peekReadableBytes(int numBytes) throws TransportException { + return new byte[0]; + } + + @Override + public byte[] read(int numBytes) throws TransportException { + return new byte[0]; + } + + @Override + public void write(byte[] bytes) throws TransportException { + } + + @Override + public void close() throws TransportException { + } + } +} diff --git a/plc4j/transports/api/src/test/java/org/apache/plc4x/java/transports/api/DefaultTransportManagerTest.java b/plc4j/transports/api/src/test/java/org/apache/plc4x/java/transports/api/DefaultTransportManagerTest.java new file mode 100644 index 0000000000..ea0e393938 --- /dev/null +++ b/plc4j/transports/api/src/test/java/org/apache/plc4x/java/transports/api/DefaultTransportManagerTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.transports.api; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.*; + +class DefaultTransportManagerTest { + + @Test + void testDefaultConstructor() { + DefaultTransportManager manager = new DefaultTransportManager(); + assertNotNull(manager); + assertNotNull(manager.classLoader); + } + + @Test + void testConstructorWithClassLoader() { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + DefaultTransportManager manager = new DefaultTransportManager(classLoader); + + assertNotNull(manager); + assertEquals(classLoader, manager.classLoader); + } + + @Test + void testGetTransport_returnsEmptyWhenNoTransportsAvailable() { + DefaultTransportManager manager = new DefaultTransportManager(); + + Optional<Transport> result = manager.getTransport("tcp"); + assertFalse(result.isPresent()); + } + + @Test + void testGetTransport_returnsEmptyForUnknownCode() { + DefaultTransportManager manager = new DefaultTransportManager(); + + Optional<Transport> result = manager.getTransport("unknown-transport-code"); + assertFalse(result.isPresent()); + } + + @Test + void testGetTransport_returnsEmptyForNullCode() { + DefaultTransportManager manager = new DefaultTransportManager(); + + Optional<Transport> result = manager.getTransport(null); + assertFalse(result.isPresent()); + } + + @Test + void testConstructorWithCustomClassLoader() { + // Create a custom classloader that won't find any transports + ClassLoader emptyClassLoader = new ClassLoader(null) { + @Override + public Class<?> loadClass(String name) throws ClassNotFoundException { + throw new ClassNotFoundException(name); + } + }; + + DefaultTransportManager manager = new DefaultTransportManager(emptyClassLoader); + assertNotNull(manager); + assertEquals(emptyClassLoader, manager.classLoader); + } +} diff --git a/plc4j/transports/api/src/test/java/org/apache/plc4x/java/transports/api/OsgiTransportManagerTest.java b/plc4j/transports/api/src/test/java/org/apache/plc4x/java/transports/api/OsgiTransportManagerTest.java new file mode 100644 index 0000000000..fe3720f0e5 --- /dev/null +++ b/plc4j/transports/api/src/test/java/org/apache/plc4x/java/transports/api/OsgiTransportManagerTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.transports.api; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.when; + +class OsgiTransportManagerTest { + + private OsgiTransportManager manager; + private Transport<?> mockTransport; + + @BeforeEach + void setUp() { + manager = new OsgiTransportManager(); + mockTransport = Mockito.mock(Transport.class); + when(mockTransport.getTransportCode()).thenReturn("tcp"); + when(mockTransport.getTransportName()).thenReturn("TCP Transport"); + } + + @Test + void testGetTransport_whenTransportNotRegistered() { + Optional<Transport> result = manager.getTransport("tcp"); + assertFalse(result.isPresent()); + } + + @Test + void testBindTransport_registersTransport() { + manager.bindTransport(mockTransport); + + Optional<Transport> result = manager.getTransport("tcp"); + assertTrue(result.isPresent()); + assertEquals(mockTransport, result.get()); + } + + @Test + void testBindTransport_duplicateTransportIgnored() { + Transport<?> mockTransport2 = Mockito.mock(Transport.class); + when(mockTransport2.getTransportCode()).thenReturn("tcp"); + when(mockTransport2.getTransportName()).thenReturn("TCP Transport 2"); + + manager.bindTransport(mockTransport); + manager.bindTransport(mockTransport2); + + Optional<Transport> result = manager.getTransport("tcp"); + assertTrue(result.isPresent()); + assertEquals(mockTransport, result.get()); + } + + @Test + void testUnbindTransport_removesTransport() { + manager.bindTransport(mockTransport); + + Optional<Transport> result = manager.getTransport("tcp"); + assertTrue(result.isPresent()); + + manager.unbindTransport(mockTransport); + + result = manager.getTransport("tcp"); + assertFalse(result.isPresent()); + } + + @Test + void testBindMultipleTransports() { + Transport<?> mockTransport2 = Mockito.mock(Transport.class); + when(mockTransport2.getTransportCode()).thenReturn("udp"); + when(mockTransport2.getTransportName()).thenReturn("UDP Transport"); + + manager.bindTransport(mockTransport); + manager.bindTransport(mockTransport2); + + Optional<Transport> tcpResult = manager.getTransport("tcp"); + Optional<Transport> udpResult = manager.getTransport("udp"); + + assertTrue(tcpResult.isPresent()); + assertTrue(udpResult.isPresent()); + assertEquals(mockTransport, tcpResult.get()); + assertEquals(mockTransport2, udpResult.get()); + } + + @Test + void testGetTransport_returnsEmptyForUnknownCode() { + manager.bindTransport(mockTransport); + + Optional<Transport> result = manager.getTransport("unknown"); + assertFalse(result.isPresent()); + } +} diff --git a/plc4j/transports/api/src/test/java/org/apache/plc4x/java/transports/api/RingBufferTest.java b/plc4j/transports/api/src/test/java/org/apache/plc4x/java/transports/api/RingBufferTest.java new file mode 100644 index 0000000000..2f0fab494c --- /dev/null +++ b/plc4j/transports/api/src/test/java/org/apache/plc4x/java/transports/api/RingBufferTest.java @@ -0,0 +1,568 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.transports.api; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.BeforeEach; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.*; + +import static org.junit.jupiter.api.Assertions.*; + +class RingBufferTest { + + private RingBuffer buffer; + + @BeforeEach + void setUp() { + buffer = new RingBuffer(10); + } + + @Test + void testConstructorWithValidCapacity() { + RingBuffer rb = new RingBuffer(100); + assertEquals(100, rb.capacity()); + assertEquals(0, rb.availableForReading()); + assertEquals(100, rb.remainingForWriting()); + } + + @Test + void testConstructorWithInvalidCapacity() { + assertThrows(IllegalArgumentException.class, () -> new RingBuffer(0)); + assertThrows(IllegalArgumentException.class, () -> new RingBuffer(-1)); + } + + @Test + void testWriteAndRead() { + byte[] data = {1, 2, 3, 4, 5}; + int written = buffer.write(data); + + assertEquals(5, written); + assertEquals(5, buffer.availableForReading()); + assertEquals(5, buffer.remainingForWriting()); + + byte[] read = buffer.read(5); + assertArrayEquals(data, read); + assertEquals(0, buffer.availableForReading()); + assertEquals(10, buffer.remainingForWriting()); + } + + @Test + void testWriteWithOffset() { + byte[] data = {1, 2, 3, 4, 5, 6, 7, 8}; + int written = buffer.write(data, 2, 4); + + assertEquals(4, written); + assertEquals(4, buffer.availableForReading()); + + byte[] read = buffer.read(4); + assertArrayEquals(new byte[]{3, 4, 5, 6}, read); + } + + @Test + void testWriteMoreThanCapacity() { + byte[] data = new byte[15]; + for (int i = 0; i < 15; i++) { + data[i] = (byte) i; + } + + // TODO: This should fail and throw an exception + int written = buffer.write(data); + assertEquals(10, written); // Only capacity amount written + assertEquals(10, buffer.availableForReading()); + assertEquals(0, buffer.remainingForWriting()); + } + + @Test + void testWriteNull() { + assertThrows(IllegalArgumentException.class, () -> buffer.write((byte[]) null)); + assertThrows(IllegalArgumentException.class, () -> buffer.write(null, 0, 0)); + } + + @Test + void testWriteWithInvalidOffsetLength() { + byte[] data = {1, 2, 3, 4, 5}; + assertThrows(IllegalArgumentException.class, () -> buffer.write(data, -1, 3)); + assertThrows(IllegalArgumentException.class, () -> buffer.write(data, 0, -1)); + assertThrows(IllegalArgumentException.class, () -> buffer.write(data, 0, 10)); + assertThrows(IllegalArgumentException.class, () -> buffer.write(data, 3, 5)); + } + + @Test + void testPeek() { + byte[] data = {1, 2, 3, 4, 5}; + buffer.write(data); + + byte[] peeked1 = buffer.peek(3); + assertArrayEquals(new byte[]{1, 2, 3}, peeked1); + assertEquals(5, buffer.availableForReading()); // Available unchanged + + byte[] peeked2 = buffer.peek(3); + assertArrayEquals(new byte[]{1, 2, 3}, peeked2); + assertEquals(5, buffer.availableForReading()); // Still unchanged + + byte[] read = buffer.read(5); + assertArrayEquals(data, read); + } + + @Test + void testPeekMoreThanAvailableForReading() { + byte[] data = {1, 2, 3}; + buffer.write(data); + + byte[] peeked = buffer.peek(10); + assertEquals(3, peeked.length); + assertArrayEquals(data, peeked); + } + + @Test + void testPeekEmpty() { + byte[] peeked = buffer.peek(5); + assertEquals(0, peeked.length); + } + + @Test + void testPeekNegative() { + assertThrows(IllegalArgumentException.class, () -> buffer.peek(-1)); + } + + @Test + void testReadMoreThanAvailableForReading() { + byte[] data = {1, 2, 3}; + buffer.write(data); + + byte[] read = buffer.read(10); + assertEquals(3, read.length); + assertArrayEquals(data, read); + assertEquals(0, buffer.availableForReading()); + } + + @Test + void testReadEmpty() { + byte[] read = buffer.read(5); + assertEquals(0, read.length); + } + + @Test + void testReadNegative() { + assertThrows(IllegalArgumentException.class, () -> buffer.read(-1)); + } + + @Test + void testReadZero() { + byte[] data = {1, 2, 3}; + buffer.write(data); + + byte[] read = buffer.read(0); + assertEquals(0, read.length); + assertEquals(3, buffer.availableForReading()); + } + + @Test + void testClear() { + byte[] data = {1, 2, 3, 4, 5}; + buffer.write(data); + + assertEquals(5, buffer.availableForReading()); + buffer.clear(); + assertEquals(0, buffer.availableForReading()); + assertEquals(10, buffer.remainingForWriting()); + } + + @Test + void testSkip() { + byte[] data = {1, 2, 3, 4, 5, 6, 7, 8}; + buffer.write(data); + + int skipped = buffer.skip(3); + assertEquals(3, skipped); + assertEquals(5, buffer.availableForReading()); + + byte[] read = buffer.read(5); + assertArrayEquals(new byte[]{4, 5, 6, 7, 8}, read); + } + + @Test + void testSkipMoreThanAvailableForReading() { + byte[] data = {1, 2, 3}; + buffer.write(data); + + int skipped = buffer.skip(10); + assertEquals(3, skipped); + assertEquals(0, buffer.availableForReading()); + } + + @Test + void testSkipNegative() { + assertThrows(IllegalArgumentException.class, () -> buffer.skip(-1)); + } + + @Test + void testWrapAround() { + // Fill buffer + byte[] data1 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + buffer.write(data1); + + // Read some to create space at the beginning + buffer.read(6); + assertEquals(4, buffer.availableForReading()); + assertEquals(6, buffer.remainingForWriting()); + + // Write data that will wrap around + byte[] data2 = {11, 12, 13, 14, 15, 16}; + int written = buffer.write(data2); + assertEquals(6, written); + assertEquals(10, buffer.availableForReading()); + + // Read all and verify order + byte[] read = buffer.read(10); + assertArrayEquals(new byte[]{7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, read); + } + + @Test + void testWrapAroundPeek() { + // Fill buffer + buffer.write(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}); + + // Read to create wrap-around condition + buffer.read(7); + + // Write data that wraps + buffer.write(new byte[]{11, 12, 13, 14, 15, 16, 17}); + + // Peek across the wrap boundary + byte[] peeked = buffer.peek(10); + assertArrayEquals(new byte[]{8, 9, 10, 11, 12, 13, 14, 15, 16, 17}, peeked); + } + + @Test + void testCompactNoWrapAround() { + // Write and read to move positions forward + buffer.write(new byte[]{1, 2, 3, 4, 5}); + buffer.read(2); + + // Compact should move data to beginning + buffer.compact(); + + assertEquals(3, buffer.availableForReading()); + byte[] read = buffer.read(3); + assertArrayEquals(new byte[]{3, 4, 5}, read); + } + + @Test + void testCompactWithWrapAround() { + // Create wrap-around condition + buffer.write(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}); + buffer.read(7); + buffer.write(new byte[]{11, 12, 13, 14, 15, 16, 17}); + + // Compact should handle wrap-around + buffer.compact(); + + assertEquals(10, buffer.availableForReading()); + byte[] read = buffer.read(10); + assertArrayEquals(new byte[]{8, 9, 10, 11, 12, 13, 14, 15, 16, 17}, read); + } + + @Test + void testCompactEmpty() { + buffer.compact(); + assertEquals(0, buffer.availableForReading()); + assertEquals(10, buffer.remainingForWriting()); + } + + @Test + void testMultipleWriteReadCycles() { + for (int i = 0; i < 100; i++) { + byte[] data = {(byte) i, (byte) (i + 1), (byte) (i + 2)}; + buffer.write(data); + + byte[] read = buffer.read(3); + assertArrayEquals(data, read); + assertEquals(0, buffer.availableForReading()); + } + } + + @Test + void testPartialReadsAndWrites() { + buffer.write(new byte[]{1, 2, 3, 4, 5}); + buffer.read(2); + buffer.write(new byte[]{6, 7, 8}); + buffer.read(3); + buffer.write(new byte[]{9, 10, 11, 12}); + + assertEquals(7, buffer.availableForReading()); + byte[] read = buffer.read(7); + assertArrayEquals(new byte[]{6, 7, 8, 9, 10, 11, 12}, read); + } + + @Test + void testConcurrentWriteAndRead() throws InterruptedException { + RingBuffer concurrentBuffer = new RingBuffer(1000); + int numOperations = 1000; + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(2); + List<Throwable> errors = new CopyOnWriteArrayList<>(); + + // Writer thread + Thread writer = new Thread(() -> { + try { + startLatch.await(); + Random random = new Random(42); + for (int i = 0; i < numOperations; i++) { + byte[] data = new byte[random.nextInt(10) + 1]; + random.nextBytes(data); + while (concurrentBuffer.write(data) < data.length) { + Thread.sleep(1); + } + } + } catch (Throwable t) { + errors.add(t); + } finally { + doneLatch.countDown(); + } + }); + + // Reader thread + Thread reader = new Thread(() -> { + try { + startLatch.await(); + Random random = new Random(43); + for (int i = 0; i < numOperations; i++) { + int toRead = random.nextInt(10) + 1; + while (concurrentBuffer.availableForReading() < toRead) { + Thread.sleep(1); + } + byte[] data = concurrentBuffer.read(toRead); + assertNotNull(data); + } + } catch (Throwable t) { + errors.add(t); + } finally { + doneLatch.countDown(); + } + }); + + writer.start(); + reader.start(); + startLatch.countDown(); + + assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Concurrent test timed out"); + + if (!errors.isEmpty()) { + throw new AssertionError("Concurrent test failed: " + errors.get(0).getMessage(), errors.get(0)); + } + } + + @Test + void testMultipleReaders() throws InterruptedException, ExecutionException { + RingBuffer sharedBuffer = new RingBuffer(1000); + sharedBuffer.write(new byte[1000]); // Fill the buffer + + int numReaders = 10; + ExecutorService executor = Executors.newFixedThreadPool(numReaders); + List<Future<Integer>> futures = new ArrayList<>(); + + for (int i = 0; i < numReaders; i++) { + futures.add(executor.submit(() -> { + int totalRead = 0; + while (sharedBuffer.availableForReading() > 0) { + byte[] data = sharedBuffer.read(10); + totalRead += data.length; + } + return totalRead; + })); + } + + int totalReadByAll = 0; + for (Future<Integer> future : futures) { + totalReadByAll += future.get(); + } + + executor.shutdown(); + assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS)); + + assertEquals(1000, totalReadByAll); + assertEquals(0, sharedBuffer.availableForReading()); + } + + @Test + void testStressTest() { + RingBuffer stressBuffer = new RingBuffer(100); + Random random = new Random(12345); + + for (int i = 0; i < 10000; i++) { + // Random operation + int op = random.nextInt(5); + switch (op) { + case 0: // Write + byte[] writeData = new byte[random.nextInt(20)]; + random.nextBytes(writeData); + stressBuffer.write(writeData); + break; + case 1: // Read + stressBuffer.read(random.nextInt(20)); + break; + case 2: // Peek + stressBuffer.peek(random.nextInt(20)); + break; + case 3: // Skip + stressBuffer.skip(random.nextInt(20)); + break; + case 4: // Clear + if (random.nextInt(10) == 0) { + stressBuffer.clear(); + } + break; + } + + // Verify invariants + assertTrue(stressBuffer.availableForReading() >= 0); + assertTrue(stressBuffer.availableForReading() <= stressBuffer.capacity()); + assertTrue(stressBuffer.remainingForWriting() >= 0); + assertTrue(stressBuffer.remainingForWriting() <= stressBuffer.capacity()); + assertEquals(stressBuffer.capacity(), stressBuffer.availableForReading() + stressBuffer.remainingForWriting()); + } + } + + // Tests for ByteBuffer write method + + @Test + void testWriteFromByteBuffer() { + java.nio.ByteBuffer byteBuffer = java.nio.ByteBuffer.allocate(5); + byteBuffer.put(new byte[]{1, 2, 3, 4, 5}); + byteBuffer.flip(); + + int written = buffer.write(byteBuffer); + + assertEquals(5, written); + assertEquals(5, buffer.availableForReading()); + assertEquals(5, buffer.remainingForWriting()); + assertEquals(0, byteBuffer.remaining()); // ByteBuffer should be consumed + + byte[] read = buffer.read(5); + assertArrayEquals(new byte[]{1, 2, 3, 4, 5}, read); + } + + @Test + void testWriteFromDirectByteBuffer() { + java.nio.ByteBuffer byteBuffer = java.nio.ByteBuffer.allocateDirect(5); + byteBuffer.put(new byte[]{10, 20, 30, 40, 50}); + byteBuffer.flip(); + + int written = buffer.write(byteBuffer); + + assertEquals(5, written); + assertEquals(5, buffer.availableForReading()); + assertEquals(0, byteBuffer.remaining()); + + byte[] read = buffer.read(5); + assertArrayEquals(new byte[]{10, 20, 30, 40, 50}, read); + } + + @Test + void testWriteFromByteBufferWithWrapAround() { + // Fill the buffer partially + buffer.write(new byte[]{1, 2, 3, 4, 5, 6, 7, 8}); + buffer.read(5); // Read 5 bytes to create space at the beginning + + // Now write 7 bytes which will wrap around + java.nio.ByteBuffer byteBuffer = java.nio.ByteBuffer.allocate(7); + byteBuffer.put(new byte[]{10, 20, 30, 40, 50, 60, 70}); + byteBuffer.flip(); + + int written = buffer.write(byteBuffer); + + assertEquals(7, written); + assertEquals(10, buffer.availableForReading()); // 3 old + 7 new + assertEquals(0, byteBuffer.remaining()); + + // Read all and verify + byte[] read = buffer.read(10); + assertArrayEquals(new byte[]{6, 7, 8, 10, 20, 30, 40, 50, 60, 70}, read); + } + + @Test + void testWriteFromByteBufferWhenBufferFull() { + // Fill the buffer + buffer.write(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}); + + java.nio.ByteBuffer byteBuffer = java.nio.ByteBuffer.allocate(5); + byteBuffer.put(new byte[]{11, 12, 13, 14, 15}); + byteBuffer.flip(); + + int written = buffer.write(byteBuffer); + + assertEquals(0, written); // Nothing written + assertEquals(10, buffer.availableForReading()); + assertEquals(5, byteBuffer.remaining()); // ByteBuffer unchanged + } + + @Test + void testWriteFromByteBufferPartial() { + // Fill buffer with 7 bytes, leaving only 3 bytes available + buffer.write(new byte[]{1, 2, 3, 4, 5, 6, 7}); + + java.nio.ByteBuffer byteBuffer = java.nio.ByteBuffer.allocate(5); + byteBuffer.put(new byte[]{10, 20, 30, 40, 50}); + byteBuffer.flip(); + + int written = buffer.write(byteBuffer); + + assertEquals(3, written); // Only 3 bytes written + assertEquals(10, buffer.availableForReading()); + assertEquals(2, byteBuffer.remaining()); // 2 bytes left in ByteBuffer + } + + @Test + void testWriteFromEmptyByteBuffer() { + java.nio.ByteBuffer byteBuffer = java.nio.ByteBuffer.allocate(0); + + int written = buffer.write(byteBuffer); + + assertEquals(0, written); + assertEquals(0, buffer.availableForReading()); + } + + @Test + void testWriteFromNullByteBuffer() { + assertThrows(IllegalArgumentException.class, () -> buffer.write((java.nio.ByteBuffer) null)); + } + + @Test + void testWriteFromByteBufferWithLimit() { + java.nio.ByteBuffer byteBuffer = java.nio.ByteBuffer.allocate(10); + byteBuffer.put(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}); + byteBuffer.flip(); + byteBuffer.limit(5); // Limit to first 5 bytes + + int written = buffer.write(byteBuffer); + + assertEquals(5, written); + assertEquals(5, buffer.availableForReading()); + assertEquals(0, byteBuffer.remaining()); + + byte[] read = buffer.read(5); + assertArrayEquals(new byte[]{1, 2, 3, 4, 5}, read); + } +} \ No newline at end of file diff --git a/plc4j/transports/api/src/test/java/org/apache/plc4x/java/transports/api/TransportTest.java b/plc4j/transports/api/src/test/java/org/apache/plc4x/java/transports/api/TransportTest.java new file mode 100644 index 0000000000..c73dcd2477 --- /dev/null +++ b/plc4j/transports/api/src/test/java/org/apache/plc4x/java/transports/api/TransportTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.transports.api; + +import org.apache.plc4x.java.transports.api.config.TransportConfiguration; +import org.apache.plc4x.java.transports.api.exceptions.TransportException; +import org.apache.plc4x.java.utils.auditlog.api.AuditLog; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +class TransportTest { + + @Test + void testDefaultGetTransportConfigType() { + // Create a minimal implementation to test the default method + Transport<TransportConfiguration> transport = new Transport<TransportConfiguration>() { + @Override + public String getTransportCode() { + return "test"; + } + + @Override + public String getTransportName() { + return "Test Transport"; + } + + @Override + public TransportInstance<TransportConfiguration> createTransportInstance(String transportUrl, TransportConfiguration configuration, AuditLog auditLog) throws TransportException { + return null; + } + }; + + // The default implementation returns null + assertNull(transport.getTransportConfigType()); + } + + @Test + void testTransportCodeAndName() { + Transport<TransportConfiguration> transport = new Transport<TransportConfiguration>() { + @Override + public String getTransportCode() { + return "tcp"; + } + + @Override + public String getTransportName() { + return "TCP Transport"; + } + + @Override + public TransportInstance<TransportConfiguration> createTransportInstance(String transportUrl, TransportConfiguration configuration, AuditLog auditLog) throws TransportException { + return null; + } + }; + + assertEquals("tcp", transport.getTransportCode()); + assertEquals("TCP Transport", transport.getTransportName()); + } + + @Test + void testCustomGetTransportConfigType() { + Transport<TransportConfiguration> transport = new Transport<TransportConfiguration>() { + @Override + public String getTransportCode() { + return "test"; + } + + @Override + public String getTransportName() { + return "Test Transport"; + } + + @Override + public Class<TransportConfiguration> getTransportConfigType() { + return TransportConfiguration.class; + } + + @Override + public TransportInstance<TransportConfiguration> createTransportInstance(String transportUrl, TransportConfiguration configuration, AuditLog auditLog) throws TransportException { + return null; + } + }; + + assertEquals(TransportConfiguration.class, transport.getTransportConfigType()); + } +} diff --git a/plc4j/transports/api/src/test/java/org/apache/plc4x/java/transports/api/exceptions/TransportExceptionTest.java b/plc4j/transports/api/src/test/java/org/apache/plc4x/java/transports/api/exceptions/TransportExceptionTest.java new file mode 100644 index 0000000000..dfb44d94af --- /dev/null +++ b/plc4j/transports/api/src/test/java/org/apache/plc4x/java/transports/api/exceptions/TransportExceptionTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.transports.api.exceptions; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +class TransportExceptionTest { + + @Test + void testConstructorWithMessage() { + String message = "Transport error occurred"; + TransportException exception = new TransportException(message); + + assertEquals(message, exception.getMessage()); + assertNull(exception.getCause()); + } + + @Test + void testConstructorWithMessageAndCause() { + String message = "Transport error occurred"; + Throwable cause = new RuntimeException("Root cause"); + TransportException exception = new TransportException(message, cause); + + assertEquals(message, exception.getMessage()); + assertEquals(cause, exception.getCause()); + } + + @Test + void testExceptionCanBeThrown() { + assertThrows(TransportException.class, () -> { + throw new TransportException("Test exception"); + }); + } + + @Test + void testExceptionCanBeCaught() { + try { + throw new TransportException("Test message"); + } catch (TransportException e) { + assertEquals("Test message", e.getMessage()); + } + } +} diff --git a/plc4j/transports/pom.xml b/plc4j/transports/pom.xml index 9a194a342f..a309030827 100644 --- a/plc4j/transports/pom.xml +++ b/plc4j/transports/pom.xml @@ -38,6 +38,7 @@ </properties> <modules> + <module>api</module> <!--module>can</module> <module>pcap-replay</module> <module>pcap-shared</module> diff --git a/pom.xml b/pom.xml index cb4e23f9e2..7715d2b483 100644 --- a/pom.xml +++ b/pom.xml @@ -483,6 +483,13 @@ <version>${mockito.version}</version> </dependency> + <!-- OSGi Dependencies --> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.service.component.annotations</artifactId> + <version>1.5.1</version> + </dependency> + <!-- TODO: Eliminate the usage of AssertJ (OPM and OPC-UA Driver) --> <dependency> <groupId>org.assertj</groupId>
