This is an automated email from the ASF dual-hosted git repository.
mattrpav pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 094dbc8 [AMQ-8515] FailoverTransport should handle
MaxFrameSizeExceededException (#785)
094dbc8 is described below
commit 094dbc89f3630691f1bd66d0d07fe879842fa51b
Author: Matt Pavlovich <[email protected]>
AuthorDate: Fri Feb 25 19:46:45 2022 -0600
[AMQ-8515] FailoverTransport should handle MaxFrameSizeExceededException
(#785)
---
.../transport/failover/FailoverTransport.java | 4 +
.../transport/MaxFrameSizeEnabledTest.java | 128 ++++++++++++++-------
2 files changed, 92 insertions(+), 40 deletions(-)
diff --git
a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
index dcb4cb8..76062e4 100644
---
a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
+++
b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
@@ -39,6 +39,7 @@ import java.util.StringTokenizer;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.activemq.MaxFrameSizeExceededException;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
@@ -697,6 +698,9 @@ public class FailoverTransport implements
CompositeTransport {
}
return;
+ } catch (MaxFrameSizeExceededException e) {
+ LOG.debug("MaxFrameSizeExceededException for command:
{}", command);
+ throw e;
} catch (IOException e) {
LOG.debug("Send oneway attempt: {} failed for command:
{}", i, command);
handleTransportFailure(e);
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java
index d2ce82e..8f98b8f 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java
@@ -56,56 +56,86 @@ public class MaxFrameSizeEnabledTest {
private BrokerService broker;
private final String transportType;
private final boolean clientSideEnabled;
+ private final boolean clientSideFailoverEnabled;
private final boolean serverSideEnabled;
- public MaxFrameSizeEnabledTest(String transportType, boolean
clientSideEnabled, boolean serverSideEnabled) {
+ public MaxFrameSizeEnabledTest(String transportType, boolean
clientSideEnabled, boolean clientSideFailoverEnabled, boolean
serverSideEnabled) {
this.transportType = transportType;
this.clientSideEnabled = clientSideEnabled;
+ this.clientSideFailoverEnabled = clientSideFailoverEnabled;
this.serverSideEnabled = serverSideEnabled;
}
-
@Parameterized.Parameters(name="transportType={0},clientSideEnable={1},serverSideEnabled={2}")
+
@Parameterized.Parameters(name="transportType={0},clientSideEnable={1},clientSideFailoverEnable={2},serverSideEnabled={3}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
//Both client and server side max frame check enabled
- {"tcp", true, true},
- {"ssl", true, true},
- {"nio", true, true},
- {"nio+ssl", true, true},
- {"auto", true, true},
- {"auto+ssl", true, true},
- {"auto+nio", true, true},
- {"auto+nio+ssl", true, true},
+ {"tcp", true, false, true},
+ {"tcp", true, true, true},
+ {"ssl", true, false, true},
+ {"ssl", true, true, true},
+ {"nio", true, false, true},
+ {"nio", true, true, true},
+ {"nio+ssl", true, false, true},
+ {"nio+ssl", true, true, true},
+ {"auto", true, false, true},
+ {"auto", true, true, true},
+ {"auto+ssl", true, false, true},
+ {"auto+ssl", true, true, true},
+ {"auto+nio", true, false, true},
+ {"auto+nio", true, true, true},
+ {"auto+nio+ssl", true, false, true},
+ {"auto+nio+ssl", true, true, true},
//Client side enabled but server side disabled
- {"tcp", true, false},
- {"ssl", true, false},
- {"nio", true, false},
- {"nio+ssl", true, false},
- {"auto", true, false},
- {"auto+ssl", true, false},
- {"auto+nio", true, false},
- {"auto+nio+ssl", true, false},
+ {"tcp", true, false, false},
+ {"tcp", true, true, false},
+ {"ssl", true, false, false},
+ {"ssl", true, true, false},
+ {"nio", true, false, false},
+ {"nio", true, true, false},
+ {"nio+ssl", true, false, false},
+ {"nio+ssl", true, true, false},
+ {"auto", true, false, false},
+ {"auto", true, true, false},
+ {"auto+ssl", true, false, false},
+ {"auto+ssl", true, true, false},
+ {"auto+nio", true, false, false},
+ {"auto+nio", true, true, false},
+ {"auto+nio+ssl", true, false, false},
+ {"auto+nio+ssl", true, true, false},
//Client side disabled but server side enabled
- {"tcp", false, true},
- {"ssl", false, true},
- {"nio", false, true},
- {"nio+ssl", false, true},
- {"auto", false, true},
- {"auto+ssl", false, true},
- {"auto+nio", false, true},
- {"auto+nio+ssl", false, true},
+ //
+ // AMQ-8515 client=false, failover=true, server=true
+ // results in infinite retries since broker closes
+ // socket, so we don't test that combo
+ {"tcp", false, false, true},
+ {"ssl", false, false, true},
+ {"nio", false, false, true},
+ {"nio+ssl", false, false, true},
+ {"auto", false, false, true},
+ {"auto+ssl", false, false, true},
+ {"auto+nio", false, false, true},
+ {"auto+nio+ssl", false, false, true},
//Client side and server side disabled
- {"tcp", false, false},
- {"ssl", false, false},
- {"nio", false, false},
- {"nio+ssl", false, false},
- {"auto", false, false},
- {"auto+ssl", false, false},
- {"auto+nio", false, false},
- {"auto+nio+ssl", false, false},
+ {"tcp", false, false, false},
+ {"tcp", false, true, false},
+ {"ssl", false, false, false},
+ {"ssl", false, true, false},
+ {"nio", false, false, false},
+ {"nio", false, true, false},
+ {"nio+ssl", false, false, false},
+ {"nio+ssl", false, true, false},
+ {"auto", false, false, false},
+ {"auto", false, true, false},
+ {"auto+ssl", false, false, false},
+ {"auto+ssl", false, true, false},
+ {"auto+nio", false, false, false},
+ {"auto+nio", false, true, false},
+ {"auto+nio+ssl", false, false, false},
+ {"auto+nio+ssl", false, true, false},
});
}
@@ -145,16 +175,14 @@ public class MaxFrameSizeEnabledTest {
@Test
public void testMaxFrameSize() throws Exception {
broker = createBroker(transportType, transportType +
"://localhost:0?wireFormat.maxFrameSize=2048" + getServerParams());
- testMaxFrameSize(transportType, (isSsl() ? "ssl" : "tcp") +
"://localhost:" +
broker.getConnectorByName(transportType).getConnectUri().getPort() +
- getClientParams(), false);
+ testMaxFrameSize(transportType,
getClientUri(broker.getConnectorByName(transportType).getConnectUri().getPort()),
false);
}
@Test
public void testMaxFrameSizeCompression() throws Exception {
// Test message body length is 99841 bytes. Compresses to ~ 48000
broker = createBroker(transportType, transportType +
"://localhost:0?wireFormat.maxFrameSize=60000" + getServerParams());
- testMaxFrameSize(transportType, (isSsl() ? "ssl" : "tcp") +
"://localhost:" +
broker.getConnectorByName(transportType).getConnectUri().getPort()
- + getClientParams(), true);
+ testMaxFrameSize(transportType,
getClientUri(broker.getConnectorByName(transportType).getConnectUri().getPort()),
true);
}
protected void testMaxFrameSize(String transportType, String clientUri,
boolean useCompression) throws Exception {
@@ -276,6 +304,10 @@ public class MaxFrameSizeEnabledTest {
return !maxFrameSizeEnabled() || clientSideEnabled || useCompression;
}
+ private boolean isFailover() {
+ return clientSideFailoverEnabled;
+ }
+
private boolean isSsl() {
return transportType.contains("ssl");
}
@@ -294,9 +326,25 @@ public class MaxFrameSizeEnabledTest {
private String getClientParams() {
if (clientSideEnabled) {
- return isSsl() ? "?socket.verifyHostName=false" : "";
+ if(clientSideFailoverEnabled) {
+ return isSsl() ? "?nested.socket.verifyHostName=false" : "";
+ } else {
+ return isSsl() ? "?socket.verifyHostName=false" : "";
+ }
+ } else {
+ if(clientSideFailoverEnabled) {
+ return isSsl() ?
"?nested.socket.verifyHostName=false&nested.wireFormat.maxFrameSizeEnabled=false"
: "?nested.wireFormat.maxFrameSizeEnabled=false";
+ } else {
+ return isSsl() ?
"?socket.verifyHostName=false&wireFormat.maxFrameSizeEnabled=false" :
"?wireFormat.maxFrameSizeEnabled=false";
+ }
+ }
+ }
+
+ private String getClientUri(int port) {
+ if(isFailover()) {
+ return "failover:(" + (isSsl() ? "ssl" : "tcp") + "://localhost:"
+ port + ")" + getClientParams() +
"&maxReconnectAttempts=1&startupMaxReconnectAttempts=1";
} else {
- return isSsl() ?
"?socket.verifyHostName=false&wireFormat.maxFrameSizeEnabled=false" :
"?wireFormat.maxFrameSizeEnabled=false";
+ return (isSsl() ? "ssl" : "tcp") + "://localhost:" + port +
getClientParams();
}
}
}