This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new 8e841980 PROTON-2899 Updates ahead of move to Netty 4.2.x
8e841980 is described below

commit 8e8419803c4c666dd3fe8e3f3a141ab7afe34b21
Author: Timothy Bish <tabish...@gmail.com>
AuthorDate: Mon Jul 14 15:53:55 2025 -0400

    PROTON-2899 Updates ahead of move to Netty 4.2.x
    
    Stabilize some tests with intermittent failures and replace use of now
    deprecated APIs with their non-deprecated versions ahead of move to a
    newer release where those could be removed. Other minor cleanups of the
    code to prepare for future netty updates.
---
 .../client/transport/netty4/TcpTransport.java      |  11 ++-
 .../transport/netty4/WebSocketTransport.java       |   2 +-
 .../client/transport/netty4/NettyServer.java       |   8 +-
 .../client/transport/netty4/TcpTransportTest.java  |   3 +-
 .../protonj2/test/driver/ProtonTestServer.java     |   4 +
 .../test/driver/netty/netty4/Netty4Client.java     |   2 +-
 .../test/driver/netty/netty4/Netty4Server.java     |   2 +-
 .../protonj2/test/driver/ProtonTestClientTest.java |   3 +
 .../qpid/protonj2/test/driver/utils/Wait.java      | 105 +++++++++++++++++++++
 9 files changed, 134 insertions(+), 6 deletions(-)

diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransport.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransport.java
index a21f06e6..13d2c55e 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransport.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransport.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -154,7 +155,15 @@ public class TcpTransport implements Transport {
 
         configureNetty(bootstrap, options);
 
-        bootstrap.connect(getHost(), 
getPort()).addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+        bootstrap.connect(getHost(), getPort()).addListener(new 
ChannelFutureListener() {
+
+            @Override
+            public void operationComplete(ChannelFuture future) throws 
Exception {
+                if (!future.isSuccess()) {
+                    handleTransportFailure(future.channel(), future.cause());
+                }
+            }
+        });
 
         return this;
     }
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransport.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransport.java
index 5d3aae6c..9e28d65f 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransport.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransport.java
@@ -153,7 +153,7 @@ public class WebSocketTransport extends TcpTransport {
         pipeline.addLast(new HttpClientCodec());
         pipeline.addLast(new HttpObjectAggregator(8192));
         if (options.webSocketCompression()) {
-            pipeline.addLast(WebSocketClientCompressionHandler.INSTANCE);
+            pipeline.addLast(new WebSocketClientCompressionHandler(0));
         }
     }
 
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/NettyServer.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/NettyServer.java
index 87fbaac3..52b7273e 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/NettyServer.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/NettyServer.java
@@ -224,7 +224,7 @@ public abstract class NettyServer implements AutoCloseable {
                         ch.pipeline().addLast(new HttpServerCodec());
                         ch.pipeline().addLast(new HttpObjectAggregator(65536));
                         if (isUseWebSocketCompression()) {
-                            ch.pipeline().addLast(new 
WebSocketServerCompressionHandler());
+                            ch.pipeline().addLast(new 
WebSocketServerCompressionHandler(0));
                         }
                         ch.pipeline().addLast(new 
WebSocketServerProtocolHandler(getWebSocketPath(), "amqp", true, maxFrameSize));
                     }
@@ -250,6 +250,8 @@ public abstract class NettyServer implements AutoCloseable {
                 serverChannel.close().sync();
             } catch (InterruptedException e) {
                 LOG.trace("Error on server channel close:", e);
+            } finally {
+                serverChannel = null;
             }
 
             // Shut down all event loops to terminate all threads.
@@ -261,6 +263,10 @@ public abstract class NettyServer implements AutoCloseable 
{
             LOG.trace("Shutting down worker group");
             workerGroup.shutdownGracefully(0, timeout, 
TimeUnit.MILLISECONDS).awaitUninterruptibly(timeout);
             LOG.trace("Worker group shut down");
+
+            // allow a chance for full termination
+            bossGroup.awaitTermination(10, TimeUnit.MILLISECONDS);
+            workerGroup.awaitTermination(10, TimeUnit.MILLISECONDS);
         }
     }
 
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java
index b7e2b669..8acea840 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java
@@ -31,6 +31,7 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -79,7 +80,7 @@ public class TcpTransportTest extends 
ImperativeClientTestCase {
     protected volatile boolean transportInitialized;
     protected volatile boolean transportConnected;
     protected volatile boolean transportErrored;
-    protected final List<Throwable> exceptions = new ArrayList<>();
+    protected final List<Throwable> exceptions = 
Collections.synchronizedList(new ArrayList<>());
     protected final List<ProtonBuffer> data = new ArrayList<>();
     protected final AtomicInteger bytesRead = new AtomicInteger();
 
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
index 9b7d6420..a3b55997 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
@@ -182,6 +182,10 @@ public class ProtonTestServer extends ProtonTestPeer {
         return server.isWSCompressionActive();
     }
 
+    public boolean hasClientConnection() {
+        return server.hasClientConnection();
+    }
+
     @Override
     public AMQPTestDriver getDriver() {
         return driver;
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Client.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Client.java
index b31c6e90..baef09ea 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Client.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Client.java
@@ -507,7 +507,7 @@ public final class Netty4Client implements NettyClient {
             channel.pipeline().addLast(new HttpObjectAggregator(8192));
             if (options.isWebSocketCompression()) {
                 channel.pipeline().addLast(new ClientWSCompressionObserver());
-                
channel.pipeline().addLast(WebSocketClientCompressionHandler.INSTANCE);
+                channel.pipeline().addLast(new 
WebSocketClientCompressionHandler(0));
             }
         }
 
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java
index 94a891dc..bae6f674 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java
@@ -284,7 +284,7 @@ public final class Netty4Server implements NettyServer {
                         ch.pipeline().addLast(new HttpObjectAggregator(65536));
                         if (options.isWebSocketCompression()) {
                             ch.pipeline().addLast(new 
ServerWSCompressionObserver());
-                            ch.pipeline().addLast(new 
WebSocketServerCompressionHandler());
+                            ch.pipeline().addLast(new 
WebSocketServerCompressionHandler(0));
                         }
                         ch.pipeline().addLast(new 
WebSocketServerProtocolHandler(getWebSocketPath(), "amqp", true, maxFrameSize));
                     }
diff --git 
a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java
 
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java
index e4036867..95192f44 100644
--- 
a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java
+++ 
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java
@@ -27,6 +27,7 @@ import 
org.apache.qpid.protonj2.test.driver.codec.security.SaslCode;
 import org.apache.qpid.protonj2.test.driver.codec.transport.AMQPHeader;
 import org.apache.qpid.protonj2.test.driver.codec.transport.Open;
 import org.apache.qpid.protonj2.test.driver.utils.TestPeerTestsBase;
+import org.apache.qpid.protonj2.test.driver.utils.Wait;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
@@ -129,6 +130,8 @@ class ProtonTestClientTest extends TestPeerTestsBase {
                 client.waitForScriptToComplete(5, TimeUnit.SECONDS);
             }
 
+            Wait.assertFalse(() -> peer.hasClientConnection());
+
             try (ProtonTestClient client = new ProtonTestClient()) {
                 client.connect(remoteURI.getHost(), remoteURI.getPort());
                 client.expectAMQPHeader();
diff --git 
a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/utils/Wait.java
 
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/utils/Wait.java
new file mode 100644
index 00000000..9e3d0d35
--- /dev/null
+++ 
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/utils/Wait.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.protonj2.test.driver.utils;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.concurrent.TimeUnit;
+
+public class Wait {
+
+    public static final long MAX_WAIT_MILLIS = 10 * 1000;
+    public static final long SLEEP_MILLIS = 50;
+    public static final String DEFAULT_FAILURE_MESSAGE = "Expected condition 
was not met";
+
+    @FunctionalInterface
+    public interface Condition {
+        boolean isSatisfied() throws Exception;
+    }
+
+    public static void assertTrue(Condition condition) {
+        assertTrue(DEFAULT_FAILURE_MESSAGE, condition);
+    }
+
+    public static void assertFalse(Condition condition) throws Exception {
+        assertTrue(() -> !condition.isSatisfied());
+    }
+
+    public static void assertFalse(String failureMessage, Condition condition) 
{
+        assertTrue(failureMessage, () -> !condition.isSatisfied());
+    }
+
+    public static void assertFalse(String failureMessage, Condition condition, 
final long duration) {
+        assertTrue(failureMessage, () -> !condition.isSatisfied(), duration, 
SLEEP_MILLIS);
+    }
+
+    public static void assertFalse(Condition condition, final long duration, 
final long sleep) {
+        assertTrue(DEFAULT_FAILURE_MESSAGE, () -> !condition.isSatisfied(), 
duration, sleep);
+    }
+
+    public static void assertTrue(Condition condition, final long duration) {
+        assertTrue(DEFAULT_FAILURE_MESSAGE, condition, duration, SLEEP_MILLIS);
+    }
+
+    public static void assertTrue(String failureMessage, Condition condition) {
+        assertTrue(failureMessage, condition, MAX_WAIT_MILLIS);
+    }
+
+    public static void assertTrue(String failureMessage, Condition condition, 
final long duration) {
+        assertTrue(failureMessage, condition, duration, SLEEP_MILLIS);
+    }
+
+    public static void assertTrue(Condition condition, final long duration, 
final long sleep) throws Exception {
+        assertTrue(DEFAULT_FAILURE_MESSAGE, condition, duration, sleep);
+    }
+
+    public static void assertTrue(String failureMessage, Condition condition, 
final long duration, final long sleep) {
+        boolean result = waitFor(condition, duration, sleep);
+
+        if (!result) {
+            fail(failureMessage);
+        }
+    }
+
+    public static boolean waitFor(Condition condition) throws Exception {
+        return waitFor(condition, MAX_WAIT_MILLIS);
+    }
+
+    public static boolean waitFor(final Condition condition, final long 
duration) throws Exception {
+        return waitFor(condition, duration, SLEEP_MILLIS);
+    }
+
+    public static boolean waitFor(final Condition condition, final long 
durationMillis, final long sleepMillis) {
+        try {
+            final long expiry = System.currentTimeMillis() + durationMillis;
+            boolean conditionSatisfied = condition.isSatisfied();
+
+            while (!conditionSatisfied && System.currentTimeMillis() < expiry) 
{
+                if (sleepMillis == 0) {
+                    Thread.yield();
+                } else {
+                    TimeUnit.MILLISECONDS.sleep(sleepMillis);
+                }
+                conditionSatisfied = condition.isSatisfied();
+            }
+
+            return conditionSatisfied;
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to