This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 85accd6b86 Increase Thrift max message size defaults (#3873)
85accd6b86 is described below

commit 85accd6b861d5fd0038bf0fa63181b6e60af64cb
Author: Christopher Tubbs <[email protected]>
AuthorDate: Wed Jul 24 21:16:32 2024 -0400

    Increase Thrift max message size defaults (#3873)
    
    * Deprecate max message properties in favor of a replacement
    * Simply remove the experimental ones (but they could be restored and
      deprecated like the non-experimental ones, if that's desired)
    * Ensure new property defaults to `Integer.MAX_VALUE`
    * Handle detecting whether a user set the new property and fall back on
      the old properties if a user customized them already
    * Set the default value for the maxMessageSize to `Integer.MAX_VALUE` in
      `TServerUtils`. However, that case should only apply when the property
      passed to the method is `null`, which it never should be. A future
      improvement could validate that the property is non-null instead.
    * Ensure all the max message properties that remain are added to the
      fixed properties, since they all require a restart to take effect
    * Slightly simplify related code in `TabletServer` to avoid passing
      `AccumuloConfiguration` when it's already available via a class method
    
    This fixes #3094
    This fixes #3739
    This fixes #3791
---
 .../java/org/apache/accumulo/core/conf/Property.java | 20 ++++++++------------
 .../org/apache/accumulo/server/rpc/TServerUtils.java |  2 +-
 .../apache/accumulo/server/rpc/TServerUtilsTest.java |  2 +-
 .../accumulo/coordinator/CompactionCoordinator.java  |  6 +++---
 .../org/apache/accumulo/compactor/Compactor.java     |  6 +++---
 .../apache/accumulo/gc/SimpleGarbageCollector.java   |  5 ++++-
 .../java/org/apache/accumulo/manager/Manager.java    | 10 ++++++++--
 .../java/org/apache/accumulo/tserver/ScanServer.java |  6 +++---
 .../org/apache/accumulo/tserver/TabletServer.java    | 14 +++++++-------
 .../test/functional/ThriftMaxFrameSizeIT.java        |  3 +--
 10 files changed, 39 insertions(+), 35 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 445e693c60..8629cf1681 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -54,6 +54,9 @@ public enum Property {
       "Properties in this category related to the configuration of SSL keys 
for"
           + " RPC. See also `instance.ssl.enabled`.",
       "1.6.0"),
+  RPC_MAX_MESSAGE_SIZE("rpc.message.size.max", 
Integer.toString(Integer.MAX_VALUE),
+      PropertyType.BYTES, "The maximum size of a message that can be received 
by a server.",
+      "2.1.3"),
   RPC_BACKLOG("rpc.backlog", "50", PropertyType.COUNT,
       "Configures the TCP backlog for the server side sockets created by 
Thrift."
           + " This property is not used for SSL type server sockets. A value 
of zero"
@@ -261,6 +264,8 @@ public enum Property {
           + " This does not equate to how often tickets are actually renewed 
(which is"
           + " performed at 80% of the ticket lifetime).",
       "1.6.5"),
+  @Deprecated(since = "2.1.3")
+  @ReplacedBy(property = RPC_MAX_MESSAGE_SIZE)
   GENERAL_MAX_MESSAGE_SIZE("general.server.message.size.max", "1G", 
PropertyType.BYTES,
       "The maximum size of a message that can be sent to a server.", "1.5.0"),
   @Experimental
@@ -468,9 +473,6 @@ public enum Property {
   SSERV_CLIENTPORT("sserver.port.client", "9996", PropertyType.PORT,
       "The port used for handling client connections on the tablet servers.", 
"2.1.0"),
   @Experimental
-  SSERV_MAX_MESSAGE_SIZE("sserver.server.message.size.max", "1G", 
PropertyType.BYTES,
-      "The maximum size of a message that can be sent to a scan server.", 
"2.1.0"),
-  @Experimental
   SSERV_MINTHREADS("sserver.server.threads.minimum", "2", PropertyType.COUNT,
       "The minimum number of threads to use to handle incoming requests.", 
"2.1.0"),
   @Experimental
@@ -793,6 +795,8 @@ public enum Property {
       "2.1.0"),
   TSERV_THREADCHECK("tserver.server.threadcheck.time", "1s", 
PropertyType.TIMEDURATION,
       "The time between adjustments of the server thread pool.", "1.4.0"),
+  @Deprecated(since = "2.1.3")
+  @ReplacedBy(property = RPC_MAX_MESSAGE_SIZE)
   TSERV_MAX_MESSAGE_SIZE("tserver.server.message.size.max", "1G", 
PropertyType.BYTES,
       "The maximum size of a message that can be sent to a tablet server.", 
"1.6.0"),
   TSERV_LOG_BUSY_TABLETS_COUNT("tserver.log.busy.tablets.count", "0", 
PropertyType.COUNT,
@@ -1479,9 +1483,6 @@ public enum Property {
   @Experimental
   COMPACTOR_THREADCHECK("compactor.threadcheck.time", "1s", 
PropertyType.TIMEDURATION,
       "The time between adjustments of the server thread pool.", "2.1.0"),
-  @Experimental
-  COMPACTOR_MAX_MESSAGE_SIZE("compactor.message.size.max", "10M", 
PropertyType.BYTES,
-      "The maximum size of a message that can be sent to a tablet server.", 
"2.1.0"),
   // CompactionCoordinator properties
   @Experimental
   COMPACTION_COORDINATOR_PREFIX("compaction.coordinator.", null, 
PropertyType.PREFIX,
@@ -1509,10 +1510,6 @@ public enum Property {
       PropertyType.TIMEDURATION, "The time between adjustments of the server 
thread pool.",
       "2.1.0"),
   @Experimental
-  
COMPACTION_COORDINATOR_MAX_MESSAGE_SIZE("compaction.coordinator.message.size.max",
 "10M",
-      PropertyType.BYTES, "The maximum size of a message that can be sent to a 
tablet server.",
-      "2.1.0"),
-  @Experimental
   COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL(
       "compaction.coordinator.compactor.dead.check.interval", "5m", 
PropertyType.TIMEDURATION,
       "The interval at which to check for dead compactors.", "2.1.0"),
@@ -1852,8 +1849,7 @@ public enum Property {
       COMPACTOR_PORTSEARCH, TSERV_PORTSEARCH,
 
       // max message options
-      SSERV_MAX_MESSAGE_SIZE, TSERV_MAX_MESSAGE_SIZE, 
COMPACTOR_MAX_MESSAGE_SIZE,
-      COMPACTION_COORDINATOR_MAX_MESSAGE_SIZE,
+      TSERV_MAX_MESSAGE_SIZE, GENERAL_MAX_MESSAGE_SIZE, RPC_MAX_MESSAGE_SIZE,
 
       // block cache options
       TSERV_CACHE_MANAGER_IMPL, TSERV_DATACACHE_SIZE, TSERV_INDEXCACHE_SIZE,
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 9375b3af90..87bfb4c0c8 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -153,7 +153,7 @@ public class TServerUtils {
       timeBetweenThreadChecks = 
config.getTimeInMillis(timeBetweenThreadChecksProperty);
     }
 
-    long maxMessageSize = 10_000_000;
+    long maxMessageSize = Integer.MAX_VALUE;
     if (maxMessageSizeProperty != null) {
       maxMessageSize = config.getAsBytes(maxMessageSizeProperty);
     }
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java
 
b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java
index d8230e80d5..81e9cfa49c 100644
--- 
a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java
@@ -306,7 +306,7 @@ public class TServerUtilsTest {
     return TServerUtils.startServer(context, hostname, 
Property.TSERV_CLIENTPORT, processor,
         "TServerUtilsTest", "TServerUtilsTestThread", 
Property.TSERV_PORTSEARCH,
         Property.TSERV_MINTHREADS, Property.TSERV_MINTHREADS_TIMEOUT, 
Property.TSERV_THREADCHECK,
-        Property.GENERAL_MAX_MESSAGE_SIZE);
+        Property.RPC_MAX_MESSAGE_SIZE);
 
   }
 }
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index af154442ef..16452b0805 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -243,9 +243,9 @@ public class CompactionCoordinator extends AbstractServer
    */
   protected ServerAddress startCoordinatorClientService() throws 
UnknownHostException {
     var processor = ThriftProcessorTypes.getCoordinatorTProcessor(this, 
getContext());
-    Property maxMessageSizeProperty =
-        
(getConfiguration().get(Property.COMPACTION_COORDINATOR_MAX_MESSAGE_SIZE) != 
null
-            ? Property.COMPACTION_COORDINATOR_MAX_MESSAGE_SIZE : 
Property.GENERAL_MAX_MESSAGE_SIZE);
+    @SuppressWarnings("deprecation")
+    var maxMessageSizeProperty = 
getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE,
+        Property.GENERAL_MAX_MESSAGE_SIZE);
     ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(),
         Property.COMPACTION_COORDINATOR_CLIENTPORT, processor, 
this.getClass().getSimpleName(),
         "Thrift Client Server", 
Property.COMPACTION_COORDINATOR_THRIFTCLIENT_PORTSEARCH,
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 87922d335f..033bb8c79d 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -330,9 +330,9 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
    */
   protected ServerAddress startCompactorClientService() throws 
UnknownHostException {
     var processor = ThriftProcessorTypes.getCompactorTProcessor(this, 
getContext());
-    Property maxMessageSizeProperty =
-        (getConfiguration().get(Property.COMPACTOR_MAX_MESSAGE_SIZE) != null
-            ? Property.COMPACTOR_MAX_MESSAGE_SIZE : 
Property.GENERAL_MAX_MESSAGE_SIZE);
+    @SuppressWarnings("deprecation")
+    var maxMessageSizeProperty = 
getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE,
+        Property.GENERAL_MAX_MESSAGE_SIZE);
     ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(),
         Property.COMPACTOR_CLIENTPORT, processor, 
this.getClass().getSimpleName(),
         "Thrift Client Server", Property.COMPACTOR_PORTSEARCH, 
Property.COMPACTOR_MINTHREADS,
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java 
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 9cafbd1fe1..ee2f31d1f1 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -396,7 +396,10 @@ public class SimpleGarbageCollector extends AbstractServer 
implements Iface {
     var processor = ThriftProcessorTypes.getGcTProcessor(this, getContext());
     IntStream port = getConfiguration().getPortStream(Property.GC_PORT);
     HostAndPort[] addresses = TServerUtils.getHostAndPorts(getHostname(), 
port);
-    long maxMessageSize = 
getConfiguration().getAsBytes(Property.GENERAL_MAX_MESSAGE_SIZE);
+    @SuppressWarnings("deprecation")
+    var maxMessageSizeProperty = 
getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE,
+        Property.GENERAL_MAX_MESSAGE_SIZE);
+    long maxMessageSize = 
getConfiguration().getAsBytes(maxMessageSizeProperty);
     ServerAddress server = TServerUtils.startTServer(getConfiguration(),
         getContext().getThriftServerType(), processor, 
this.getClass().getSimpleName(),
         "GC Monitor Service", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, 
maxMessageSize,
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 514c0271d4..16548ea6d0 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -1214,10 +1214,13 @@ public class Manager extends AbstractServer
         ThriftProcessorTypes.getManagerTProcessor(fateServiceHandler, haProxy, 
getContext());
 
     try {
+      @SuppressWarnings("deprecation")
+      var maxMessageSizeProperty = 
getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE,
+          Property.GENERAL_MAX_MESSAGE_SIZE);
       sa = TServerUtils.startServer(context, getHostname(), 
Property.MANAGER_CLIENTPORT, processor,
           "Manager", "Manager Client Service Handler", null, 
Property.MANAGER_MINTHREADS,
           Property.MANAGER_MINTHREADS_TIMEOUT, Property.MANAGER_THREADCHECK,
-          Property.GENERAL_MAX_MESSAGE_SIZE);
+          maxMessageSizeProperty);
     } catch (UnknownHostException e) {
       throw new IllegalStateException("Unable to start server on host " + 
getHostname(), e);
     }
@@ -1569,10 +1572,13 @@ public class Manager extends AbstractServer
     var processor =
         
ThriftProcessorTypes.getReplicationCoordinatorTProcessor(haReplicationProxy, 
getContext());
 
+    var maxMessageSizeProperty = 
getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE,
+        Property.GENERAL_MAX_MESSAGE_SIZE);
+
     ServerAddress replAddress = TServerUtils.startServer(context, 
getHostname(),
         Property.MANAGER_REPLICATION_COORDINATOR_PORT, processor, "Manager 
Replication Coordinator",
         "Replication Coordinator", null, 
Property.MANAGER_REPLICATION_COORDINATOR_MINTHREADS, null,
-        Property.MANAGER_REPLICATION_COORDINATOR_THREADCHECK, 
Property.GENERAL_MAX_MESSAGE_SIZE);
+        Property.MANAGER_REPLICATION_COORDINATOR_THREADCHECK, 
maxMessageSizeProperty);
 
     log.info("Started replication coordinator service at " + 
replAddress.address);
     // Start the daemon to scan the replication table and make units of work
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index f959d2a010..ad9c72d0c0 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -306,9 +306,9 @@ public class ScanServer extends AbstractServer
     // to set up the ThriftProcessor using this class, not the delegate.
     TProcessor processor = ThriftProcessorTypes.getScanServerTProcessor(this, 
getContext());
 
-    Property maxMessageSizeProperty =
-        (getConfiguration().get(Property.SSERV_MAX_MESSAGE_SIZE) != null
-            ? Property.SSERV_MAX_MESSAGE_SIZE : 
Property.GENERAL_MAX_MESSAGE_SIZE);
+    @SuppressWarnings("deprecation")
+    var maxMessageSizeProperty = 
getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE,
+        Property.GENERAL_MAX_MESSAGE_SIZE);
     ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(),
         Property.SSERV_CLIENTPORT, processor, this.getClass().getSimpleName(),
         "Thrift Client Server", Property.SSERV_PORTSEARCH, 
Property.SSERV_MINTHREADS,
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index bf69a93534..79c969cf77 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -544,10 +544,11 @@ public class TabletServer extends AbstractServer 
implements TabletHostingServer
     }
   }
 
-  private HostAndPort startServer(AccumuloConfiguration conf, String address, 
TProcessor processor)
+  private HostAndPort startServer(String address, TProcessor processor)
       throws UnknownHostException {
-    Property maxMessageSizeProperty = 
(conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null
-        ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
+    @SuppressWarnings("deprecation")
+    var maxMessageSizeProperty = 
getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE,
+        Property.TSERV_MAX_MESSAGE_SIZE, Property.GENERAL_MAX_MESSAGE_SIZE);
     ServerAddress sp = TServerUtils.startServer(getContext(), address, 
Property.TSERV_CLIENTPORT,
         processor, this.getClass().getSimpleName(), "Thrift Client Server",
         Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS, 
Property.TSERV_MINTHREADS_TIMEOUT,
@@ -612,7 +613,7 @@ public class TabletServer extends AbstractServer implements 
TabletHostingServer
 
     TProcessor processor = 
ThriftProcessorTypes.getTabletServerTProcessor(clientHandler,
         thriftClientHandler, scanClientHandler, getContext());
-    HostAndPort address = startServer(getConfiguration(), 
clientAddress.getHost(), processor);
+    HostAndPort address = startServer(clientAddress.getHost(), processor);
     log.info("address = {}", address);
     return address;
   }
@@ -622,9 +623,8 @@ public class TabletServer extends AbstractServer implements 
TabletHostingServer
     final var handler =
         new 
org.apache.accumulo.tserver.replication.ReplicationServicerHandler(this);
     var processor = 
ThriftProcessorTypes.getReplicationClientTProcessor(handler, getContext());
-    Property maxMessageSizeProperty =
-        getConfiguration().get(Property.TSERV_MAX_MESSAGE_SIZE) != null
-            ? Property.TSERV_MAX_MESSAGE_SIZE : 
Property.GENERAL_MAX_MESSAGE_SIZE;
+    var maxMessageSizeProperty = 
getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE,
+        Property.TSERV_MAX_MESSAGE_SIZE, Property.GENERAL_MAX_MESSAGE_SIZE);
     ServerAddress sp = TServerUtils.startServer(getContext(), 
clientAddress.getHost(),
         Property.REPLICATION_RECEIPT_SERVICE_PORT, processor, 
"ReplicationServicerHandler",
         "Replication Servicer", Property.TSERV_PORTSEARCH, 
Property.REPLICATION_MIN_THREADS, null,
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java
index 0a94d0a974..faf15ce6d6 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java
@@ -92,8 +92,7 @@ public class ThriftMaxFrameSizeIT {
       cfg.setNumTservers(1);
       cfg.setProperty(Property.GENERAL_RPC_SERVER_TYPE, serverType.name());
       String maxFrameSizeStr = Integer.toString(CONFIGURED_MAX_FRAME_SIZE);
-      cfg.setProperty(Property.GENERAL_MAX_MESSAGE_SIZE, maxFrameSizeStr);
-      cfg.setProperty(Property.TSERV_MAX_MESSAGE_SIZE, maxFrameSizeStr);
+      cfg.setProperty(Property.RPC_MAX_MESSAGE_SIZE, maxFrameSizeStr);
       if (serverType == ThriftServerType.SSL) {
         configureForSsl(cfg,
             getSslDir(createTestDir(this.getClass().getName() + "_" + 
this.testName())));

Reply via email to