This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push: new 8e841980 PROTON-2899 Updates ahead of move to Netty 4.2.x 8e841980 is described below commit 8e8419803c4c666dd3fe8e3f3a141ab7afe34b21 Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Mon Jul 14 15:53:55 2025 -0400 PROTON-2899 Updates ahead of move to Netty 4.2.x Stabilize some tests with intermittent failures and replace use of now deprecated APIs with their non-deprecated versions ahead of move to a newer release where those could be removed. Other minor cleanups of the code to prepare for future netty updates. --- .../client/transport/netty4/TcpTransport.java | 11 ++- .../transport/netty4/WebSocketTransport.java | 2 +- .../client/transport/netty4/NettyServer.java | 8 +- .../client/transport/netty4/TcpTransportTest.java | 3 +- .../protonj2/test/driver/ProtonTestServer.java | 4 + .../test/driver/netty/netty4/Netty4Client.java | 2 +- .../test/driver/netty/netty4/Netty4Server.java | 2 +- .../protonj2/test/driver/ProtonTestClientTest.java | 3 + .../qpid/protonj2/test/driver/utils/Wait.java | 105 +++++++++++++++++++++ 9 files changed, 134 insertions(+), 6 deletions(-) diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransport.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransport.java index a21f06e6..13d2c55e 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransport.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransport.java @@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -154,7 +155,15 @@ public class TcpTransport implements Transport { configureNetty(bootstrap, options); - bootstrap.connect(getHost(), getPort()).addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); + bootstrap.connect(getHost(), getPort()).addListener(new ChannelFutureListener() { + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + handleTransportFailure(future.channel(), future.cause()); + } + } + }); return this; } diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransport.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransport.java index 5d3aae6c..9e28d65f 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransport.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransport.java @@ -153,7 +153,7 @@ public class WebSocketTransport extends TcpTransport { pipeline.addLast(new HttpClientCodec()); pipeline.addLast(new HttpObjectAggregator(8192)); if (options.webSocketCompression()) { - pipeline.addLast(WebSocketClientCompressionHandler.INSTANCE); + pipeline.addLast(new WebSocketClientCompressionHandler(0)); } } diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/NettyServer.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/NettyServer.java index 87fbaac3..52b7273e 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/NettyServer.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/NettyServer.java @@ -224,7 +224,7 @@ public abstract class NettyServer implements AutoCloseable { ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new HttpObjectAggregator(65536)); if (isUseWebSocketCompression()) { - ch.pipeline().addLast(new WebSocketServerCompressionHandler()); + ch.pipeline().addLast(new WebSocketServerCompressionHandler(0)); } ch.pipeline().addLast(new WebSocketServerProtocolHandler(getWebSocketPath(), "amqp", true, maxFrameSize)); } @@ -250,6 +250,8 @@ public abstract class NettyServer implements AutoCloseable { serverChannel.close().sync(); } catch (InterruptedException e) { LOG.trace("Error on server channel close:", e); + } finally { + serverChannel = null; } // Shut down all event loops to terminate all threads. @@ -261,6 +263,10 @@ public abstract class NettyServer implements AutoCloseable { LOG.trace("Shutting down worker group"); workerGroup.shutdownGracefully(0, timeout, TimeUnit.MILLISECONDS).awaitUninterruptibly(timeout); LOG.trace("Worker group shut down"); + + // allow a chance for full termination + bossGroup.awaitTermination(10, TimeUnit.MILLISECONDS); + workerGroup.awaitTermination(10, TimeUnit.MILLISECONDS); } } diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java index b7e2b669..8acea840 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.lang.reflect.Field; import java.net.URI; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; @@ -79,7 +80,7 @@ public class TcpTransportTest extends ImperativeClientTestCase { protected volatile boolean transportInitialized; protected volatile boolean transportConnected; protected volatile boolean transportErrored; - protected final List<Throwable> exceptions = new ArrayList<>(); + protected final List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<>()); protected final List<ProtonBuffer> data = new ArrayList<>(); protected final AtomicInteger bytesRead = new AtomicInteger(); diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java index 9b7d6420..a3b55997 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java @@ -182,6 +182,10 @@ public class ProtonTestServer extends ProtonTestPeer { return server.isWSCompressionActive(); } + public boolean hasClientConnection() { + return server.hasClientConnection(); + } + @Override public AMQPTestDriver getDriver() { return driver; diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Client.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Client.java index b31c6e90..baef09ea 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Client.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Client.java @@ -507,7 +507,7 @@ public final class Netty4Client implements NettyClient { channel.pipeline().addLast(new HttpObjectAggregator(8192)); if (options.isWebSocketCompression()) { channel.pipeline().addLast(new ClientWSCompressionObserver()); - channel.pipeline().addLast(WebSocketClientCompressionHandler.INSTANCE); + channel.pipeline().addLast(new WebSocketClientCompressionHandler(0)); } } diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java index 94a891dc..bae6f674 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java @@ -284,7 +284,7 @@ public final class Netty4Server implements NettyServer { ch.pipeline().addLast(new HttpObjectAggregator(65536)); if (options.isWebSocketCompression()) { ch.pipeline().addLast(new ServerWSCompressionObserver()); - ch.pipeline().addLast(new WebSocketServerCompressionHandler()); + ch.pipeline().addLast(new WebSocketServerCompressionHandler(0)); } ch.pipeline().addLast(new WebSocketServerProtocolHandler(getWebSocketPath(), "amqp", true, maxFrameSize)); } diff --git a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java index e4036867..95192f44 100644 --- a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java +++ b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java @@ -27,6 +27,7 @@ import org.apache.qpid.protonj2.test.driver.codec.security.SaslCode; import org.apache.qpid.protonj2.test.driver.codec.transport.AMQPHeader; import org.apache.qpid.protonj2.test.driver.codec.transport.Open; import org.apache.qpid.protonj2.test.driver.utils.TestPeerTestsBase; +import org.apache.qpid.protonj2.test.driver.utils.Wait; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; @@ -129,6 +130,8 @@ class ProtonTestClientTest extends TestPeerTestsBase { client.waitForScriptToComplete(5, TimeUnit.SECONDS); } + Wait.assertFalse(() -> peer.hasClientConnection()); + try (ProtonTestClient client = new ProtonTestClient()) { client.connect(remoteURI.getHost(), remoteURI.getPort()); client.expectAMQPHeader(); diff --git a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/utils/Wait.java b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/utils/Wait.java new file mode 100644 index 00000000..9e3d0d35 --- /dev/null +++ b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/utils/Wait.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.protonj2.test.driver.utils; + +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.concurrent.TimeUnit; + +public class Wait { + + public static final long MAX_WAIT_MILLIS = 10 * 1000; + public static final long SLEEP_MILLIS = 50; + public static final String DEFAULT_FAILURE_MESSAGE = "Expected condition was not met"; + + @FunctionalInterface + public interface Condition { + boolean isSatisfied() throws Exception; + } + + public static void assertTrue(Condition condition) { + assertTrue(DEFAULT_FAILURE_MESSAGE, condition); + } + + public static void assertFalse(Condition condition) throws Exception { + assertTrue(() -> !condition.isSatisfied()); + } + + public static void assertFalse(String failureMessage, Condition condition) { + assertTrue(failureMessage, () -> !condition.isSatisfied()); + } + + public static void assertFalse(String failureMessage, Condition condition, final long duration) { + assertTrue(failureMessage, () -> !condition.isSatisfied(), duration, SLEEP_MILLIS); + } + + public static void assertFalse(Condition condition, final long duration, final long sleep) { + assertTrue(DEFAULT_FAILURE_MESSAGE, () -> !condition.isSatisfied(), duration, sleep); + } + + public static void assertTrue(Condition condition, final long duration) { + assertTrue(DEFAULT_FAILURE_MESSAGE, condition, duration, SLEEP_MILLIS); + } + + public static void assertTrue(String failureMessage, Condition condition) { + assertTrue(failureMessage, condition, MAX_WAIT_MILLIS); + } + + public static void assertTrue(String failureMessage, Condition condition, final long duration) { + assertTrue(failureMessage, condition, duration, SLEEP_MILLIS); + } + + public static void assertTrue(Condition condition, final long duration, final long sleep) throws Exception { + assertTrue(DEFAULT_FAILURE_MESSAGE, condition, duration, sleep); + } + + public static void assertTrue(String failureMessage, Condition condition, final long duration, final long sleep) { + boolean result = waitFor(condition, duration, sleep); + + if (!result) { + fail(failureMessage); + } + } + + public static boolean waitFor(Condition condition) throws Exception { + return waitFor(condition, MAX_WAIT_MILLIS); + } + + public static boolean waitFor(final Condition condition, final long duration) throws Exception { + return waitFor(condition, duration, SLEEP_MILLIS); + } + + public static boolean waitFor(final Condition condition, final long durationMillis, final long sleepMillis) { + try { + final long expiry = System.currentTimeMillis() + durationMillis; + boolean conditionSatisfied = condition.isSatisfied(); + + while (!conditionSatisfied && System.currentTimeMillis() < expiry) { + if (sleepMillis == 0) { + Thread.yield(); + } else { + TimeUnit.MILLISECONDS.sleep(sleepMillis); + } + conditionSatisfied = condition.isSatisfied(); + } + + return conditionSatisfied; + } catch (Exception e) { + throw new IllegalStateException(e); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org