This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 85accd6b86 Increase Thrift max message size defaults (#3873)
85accd6b86 is described below
commit 85accd6b861d5fd0038bf0fa63181b6e60af64cb
Author: Christopher Tubbs <[email protected]>
AuthorDate: Wed Jul 24 21:16:32 2024 -0400
Increase Thrift max message size defaults (#3873)
* Deprecate max message properties in favor of a replacement
* Simply remove the experimental ones (but they could be restored and
deprecated like the non-experimental ones, if that's desired)
* Ensure new property defaults to `Integer.MAX_VALUE`
* Handle detecting whether a user set the new property and fall back on
the old properties if a user customized them already
* Set the default value for the maxMessageSize to `Integer.MAX_VALUE` in
`TServerUtils`. However, that case should only apply when the property
passed to the method is `null`, which it never should be. A future
improvement could validate that the property is non-null instead.
* Ensure all the max message properties that remain are added to the
fixed properties, since they all require a restart to take effect
* Slightly simplify related code in `TabletServer` to avoid passing
`AccumuloConfiguration` when it's already available via a class method
This fixes #3094
This fixes #3739
This fixes #3791
---
.../java/org/apache/accumulo/core/conf/Property.java | 20 ++++++++------------
.../org/apache/accumulo/server/rpc/TServerUtils.java | 2 +-
.../apache/accumulo/server/rpc/TServerUtilsTest.java | 2 +-
.../accumulo/coordinator/CompactionCoordinator.java | 6 +++---
.../org/apache/accumulo/compactor/Compactor.java | 6 +++---
.../apache/accumulo/gc/SimpleGarbageCollector.java | 5 ++++-
.../java/org/apache/accumulo/manager/Manager.java | 10 ++++++++--
.../java/org/apache/accumulo/tserver/ScanServer.java | 6 +++---
.../org/apache/accumulo/tserver/TabletServer.java | 14 +++++++-------
.../test/functional/ThriftMaxFrameSizeIT.java | 3 +--
10 files changed, 39 insertions(+), 35 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 445e693c60..8629cf1681 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -54,6 +54,9 @@ public enum Property {
"Properties in this category related to the configuration of SSL keys
for"
+ " RPC. See also `instance.ssl.enabled`.",
"1.6.0"),
+ RPC_MAX_MESSAGE_SIZE("rpc.message.size.max",
Integer.toString(Integer.MAX_VALUE),
+ PropertyType.BYTES, "The maximum size of a message that can be received
by a server.",
+ "2.1.3"),
RPC_BACKLOG("rpc.backlog", "50", PropertyType.COUNT,
"Configures the TCP backlog for the server side sockets created by
Thrift."
+ " This property is not used for SSL type server sockets. A value
of zero"
@@ -261,6 +264,8 @@ public enum Property {
+ " This does not equate to how often tickets are actually renewed
(which is"
+ " performed at 80% of the ticket lifetime).",
"1.6.5"),
+ @Deprecated(since = "2.1.3")
+ @ReplacedBy(property = RPC_MAX_MESSAGE_SIZE)
GENERAL_MAX_MESSAGE_SIZE("general.server.message.size.max", "1G",
PropertyType.BYTES,
"The maximum size of a message that can be sent to a server.", "1.5.0"),
@Experimental
@@ -468,9 +473,6 @@ public enum Property {
SSERV_CLIENTPORT("sserver.port.client", "9996", PropertyType.PORT,
"The port used for handling client connections on the tablet servers.",
"2.1.0"),
@Experimental
- SSERV_MAX_MESSAGE_SIZE("sserver.server.message.size.max", "1G",
PropertyType.BYTES,
- "The maximum size of a message that can be sent to a scan server.",
"2.1.0"),
- @Experimental
SSERV_MINTHREADS("sserver.server.threads.minimum", "2", PropertyType.COUNT,
"The minimum number of threads to use to handle incoming requests.",
"2.1.0"),
@Experimental
@@ -793,6 +795,8 @@ public enum Property {
"2.1.0"),
TSERV_THREADCHECK("tserver.server.threadcheck.time", "1s",
PropertyType.TIMEDURATION,
"The time between adjustments of the server thread pool.", "1.4.0"),
+ @Deprecated(since = "2.1.3")
+ @ReplacedBy(property = RPC_MAX_MESSAGE_SIZE)
TSERV_MAX_MESSAGE_SIZE("tserver.server.message.size.max", "1G",
PropertyType.BYTES,
"The maximum size of a message that can be sent to a tablet server.",
"1.6.0"),
TSERV_LOG_BUSY_TABLETS_COUNT("tserver.log.busy.tablets.count", "0",
PropertyType.COUNT,
@@ -1479,9 +1483,6 @@ public enum Property {
@Experimental
COMPACTOR_THREADCHECK("compactor.threadcheck.time", "1s",
PropertyType.TIMEDURATION,
"The time between adjustments of the server thread pool.", "2.1.0"),
- @Experimental
- COMPACTOR_MAX_MESSAGE_SIZE("compactor.message.size.max", "10M",
PropertyType.BYTES,
- "The maximum size of a message that can be sent to a tablet server.",
"2.1.0"),
// CompactionCoordinator properties
@Experimental
COMPACTION_COORDINATOR_PREFIX("compaction.coordinator.", null,
PropertyType.PREFIX,
@@ -1509,10 +1510,6 @@ public enum Property {
PropertyType.TIMEDURATION, "The time between adjustments of the server
thread pool.",
"2.1.0"),
@Experimental
-
COMPACTION_COORDINATOR_MAX_MESSAGE_SIZE("compaction.coordinator.message.size.max",
"10M",
- PropertyType.BYTES, "The maximum size of a message that can be sent to a
tablet server.",
- "2.1.0"),
- @Experimental
COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL(
"compaction.coordinator.compactor.dead.check.interval", "5m",
PropertyType.TIMEDURATION,
"The interval at which to check for dead compactors.", "2.1.0"),
@@ -1852,8 +1849,7 @@ public enum Property {
COMPACTOR_PORTSEARCH, TSERV_PORTSEARCH,
// max message options
- SSERV_MAX_MESSAGE_SIZE, TSERV_MAX_MESSAGE_SIZE,
COMPACTOR_MAX_MESSAGE_SIZE,
- COMPACTION_COORDINATOR_MAX_MESSAGE_SIZE,
+ TSERV_MAX_MESSAGE_SIZE, GENERAL_MAX_MESSAGE_SIZE, RPC_MAX_MESSAGE_SIZE,
// block cache options
TSERV_CACHE_MANAGER_IMPL, TSERV_DATACACHE_SIZE, TSERV_INDEXCACHE_SIZE,
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 9375b3af90..87bfb4c0c8 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -153,7 +153,7 @@ public class TServerUtils {
timeBetweenThreadChecks =
config.getTimeInMillis(timeBetweenThreadChecksProperty);
}
- long maxMessageSize = 10_000_000;
+ long maxMessageSize = Integer.MAX_VALUE;
if (maxMessageSizeProperty != null) {
maxMessageSize = config.getAsBytes(maxMessageSizeProperty);
}
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java
b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java
index d8230e80d5..81e9cfa49c 100644
---
a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java
+++
b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java
@@ -306,7 +306,7 @@ public class TServerUtilsTest {
return TServerUtils.startServer(context, hostname,
Property.TSERV_CLIENTPORT, processor,
"TServerUtilsTest", "TServerUtilsTestThread",
Property.TSERV_PORTSEARCH,
Property.TSERV_MINTHREADS, Property.TSERV_MINTHREADS_TIMEOUT,
Property.TSERV_THREADCHECK,
- Property.GENERAL_MAX_MESSAGE_SIZE);
+ Property.RPC_MAX_MESSAGE_SIZE);
}
}
diff --git
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index af154442ef..16452b0805 100644
---
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -243,9 +243,9 @@ public class CompactionCoordinator extends AbstractServer
*/
protected ServerAddress startCoordinatorClientService() throws
UnknownHostException {
var processor = ThriftProcessorTypes.getCoordinatorTProcessor(this,
getContext());
- Property maxMessageSizeProperty =
-
(getConfiguration().get(Property.COMPACTION_COORDINATOR_MAX_MESSAGE_SIZE) !=
null
- ? Property.COMPACTION_COORDINATOR_MAX_MESSAGE_SIZE :
Property.GENERAL_MAX_MESSAGE_SIZE);
+ @SuppressWarnings("deprecation")
+ var maxMessageSizeProperty =
getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE,
+ Property.GENERAL_MAX_MESSAGE_SIZE);
ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(),
Property.COMPACTION_COORDINATOR_CLIENTPORT, processor,
this.getClass().getSimpleName(),
"Thrift Client Server",
Property.COMPACTION_COORDINATOR_THRIFTCLIENT_PORTSEARCH,
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 87922d335f..033bb8c79d 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -330,9 +330,9 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
*/
protected ServerAddress startCompactorClientService() throws
UnknownHostException {
var processor = ThriftProcessorTypes.getCompactorTProcessor(this,
getContext());
- Property maxMessageSizeProperty =
- (getConfiguration().get(Property.COMPACTOR_MAX_MESSAGE_SIZE) != null
- ? Property.COMPACTOR_MAX_MESSAGE_SIZE :
Property.GENERAL_MAX_MESSAGE_SIZE);
+ @SuppressWarnings("deprecation")
+ var maxMessageSizeProperty =
getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE,
+ Property.GENERAL_MAX_MESSAGE_SIZE);
ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(),
Property.COMPACTOR_CLIENTPORT, processor,
this.getClass().getSimpleName(),
"Thrift Client Server", Property.COMPACTOR_PORTSEARCH,
Property.COMPACTOR_MINTHREADS,
diff --git
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 9cafbd1fe1..ee2f31d1f1 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -396,7 +396,10 @@ public class SimpleGarbageCollector extends AbstractServer
implements Iface {
var processor = ThriftProcessorTypes.getGcTProcessor(this, getContext());
IntStream port = getConfiguration().getPortStream(Property.GC_PORT);
HostAndPort[] addresses = TServerUtils.getHostAndPorts(getHostname(),
port);
- long maxMessageSize =
getConfiguration().getAsBytes(Property.GENERAL_MAX_MESSAGE_SIZE);
+ @SuppressWarnings("deprecation")
+ var maxMessageSizeProperty =
getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE,
+ Property.GENERAL_MAX_MESSAGE_SIZE);
+ long maxMessageSize =
getConfiguration().getAsBytes(maxMessageSizeProperty);
ServerAddress server = TServerUtils.startTServer(getConfiguration(),
getContext().getThriftServerType(), processor,
this.getClass().getSimpleName(),
"GC Monitor Service", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000,
maxMessageSize,
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 514c0271d4..16548ea6d0 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -1214,10 +1214,13 @@ public class Manager extends AbstractServer
ThriftProcessorTypes.getManagerTProcessor(fateServiceHandler, haProxy,
getContext());
try {
+ @SuppressWarnings("deprecation")
+ var maxMessageSizeProperty =
getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE,
+ Property.GENERAL_MAX_MESSAGE_SIZE);
sa = TServerUtils.startServer(context, getHostname(),
Property.MANAGER_CLIENTPORT, processor,
"Manager", "Manager Client Service Handler", null,
Property.MANAGER_MINTHREADS,
Property.MANAGER_MINTHREADS_TIMEOUT, Property.MANAGER_THREADCHECK,
- Property.GENERAL_MAX_MESSAGE_SIZE);
+ maxMessageSizeProperty);
} catch (UnknownHostException e) {
throw new IllegalStateException("Unable to start server on host " +
getHostname(), e);
}
@@ -1569,10 +1572,13 @@ public class Manager extends AbstractServer
var processor =
ThriftProcessorTypes.getReplicationCoordinatorTProcessor(haReplicationProxy,
getContext());
+ var maxMessageSizeProperty =
getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE,
+ Property.GENERAL_MAX_MESSAGE_SIZE);
+
ServerAddress replAddress = TServerUtils.startServer(context,
getHostname(),
Property.MANAGER_REPLICATION_COORDINATOR_PORT, processor, "Manager
Replication Coordinator",
"Replication Coordinator", null,
Property.MANAGER_REPLICATION_COORDINATOR_MINTHREADS, null,
- Property.MANAGER_REPLICATION_COORDINATOR_THREADCHECK,
Property.GENERAL_MAX_MESSAGE_SIZE);
+ Property.MANAGER_REPLICATION_COORDINATOR_THREADCHECK,
maxMessageSizeProperty);
log.info("Started replication coordinator service at " +
replAddress.address);
// Start the daemon to scan the replication table and make units of work
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index f959d2a010..ad9c72d0c0 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -306,9 +306,9 @@ public class ScanServer extends AbstractServer
// to set up the ThriftProcessor using this class, not the delegate.
TProcessor processor = ThriftProcessorTypes.getScanServerTProcessor(this,
getContext());
- Property maxMessageSizeProperty =
- (getConfiguration().get(Property.SSERV_MAX_MESSAGE_SIZE) != null
- ? Property.SSERV_MAX_MESSAGE_SIZE :
Property.GENERAL_MAX_MESSAGE_SIZE);
+ @SuppressWarnings("deprecation")
+ var maxMessageSizeProperty =
getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE,
+ Property.GENERAL_MAX_MESSAGE_SIZE);
ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(),
Property.SSERV_CLIENTPORT, processor, this.getClass().getSimpleName(),
"Thrift Client Server", Property.SSERV_PORTSEARCH,
Property.SSERV_MINTHREADS,
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index bf69a93534..79c969cf77 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -544,10 +544,11 @@ public class TabletServer extends AbstractServer
implements TabletHostingServer
}
}
- private HostAndPort startServer(AccumuloConfiguration conf, String address,
TProcessor processor)
+ private HostAndPort startServer(String address, TProcessor processor)
throws UnknownHostException {
- Property maxMessageSizeProperty =
(conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null
- ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
+ @SuppressWarnings("deprecation")
+ var maxMessageSizeProperty =
getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE,
+ Property.TSERV_MAX_MESSAGE_SIZE, Property.GENERAL_MAX_MESSAGE_SIZE);
ServerAddress sp = TServerUtils.startServer(getContext(), address,
Property.TSERV_CLIENTPORT,
processor, this.getClass().getSimpleName(), "Thrift Client Server",
Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS,
Property.TSERV_MINTHREADS_TIMEOUT,
@@ -612,7 +613,7 @@ public class TabletServer extends AbstractServer implements
TabletHostingServer
TProcessor processor =
ThriftProcessorTypes.getTabletServerTProcessor(clientHandler,
thriftClientHandler, scanClientHandler, getContext());
- HostAndPort address = startServer(getConfiguration(),
clientAddress.getHost(), processor);
+ HostAndPort address = startServer(clientAddress.getHost(), processor);
log.info("address = {}", address);
return address;
}
@@ -622,9 +623,8 @@ public class TabletServer extends AbstractServer implements
TabletHostingServer
final var handler =
new
org.apache.accumulo.tserver.replication.ReplicationServicerHandler(this);
var processor =
ThriftProcessorTypes.getReplicationClientTProcessor(handler, getContext());
- Property maxMessageSizeProperty =
- getConfiguration().get(Property.TSERV_MAX_MESSAGE_SIZE) != null
- ? Property.TSERV_MAX_MESSAGE_SIZE :
Property.GENERAL_MAX_MESSAGE_SIZE;
+ var maxMessageSizeProperty =
getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE,
+ Property.TSERV_MAX_MESSAGE_SIZE, Property.GENERAL_MAX_MESSAGE_SIZE);
ServerAddress sp = TServerUtils.startServer(getContext(),
clientAddress.getHost(),
Property.REPLICATION_RECEIPT_SERVICE_PORT, processor,
"ReplicationServicerHandler",
"Replication Servicer", Property.TSERV_PORTSEARCH,
Property.REPLICATION_MIN_THREADS, null,
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java
index 0a94d0a974..faf15ce6d6 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java
@@ -92,8 +92,7 @@ public class ThriftMaxFrameSizeIT {
cfg.setNumTservers(1);
cfg.setProperty(Property.GENERAL_RPC_SERVER_TYPE, serverType.name());
String maxFrameSizeStr = Integer.toString(CONFIGURED_MAX_FRAME_SIZE);
- cfg.setProperty(Property.GENERAL_MAX_MESSAGE_SIZE, maxFrameSizeStr);
- cfg.setProperty(Property.TSERV_MAX_MESSAGE_SIZE, maxFrameSizeStr);
+ cfg.setProperty(Property.RPC_MAX_MESSAGE_SIZE, maxFrameSizeStr);
if (serverType == ThriftServerType.SSL) {
configureForSsl(cfg,
getSslDir(createTestDir(this.getClass().getName() + "_" +
this.testName())));