This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new ad54f51 HDFS-16175.Improve the configurable value of Server
#PURGE_INTERVAL_NANOS. (#3307)
ad54f51 is described below
commit ad54f5195c8c01f333703c55cd70703109d75f29
Author: jianghuazhu <[email protected]>
AuthorDate: Wed Aug 25 17:34:45 2021 +0800
HDFS-16175.Improve the configurable value of Server #PURGE_INTERVAL_NANOS.
(#3307)
Co-authored-by: zhujianghua <[email protected]>
Reviewed-by: Ayush Saxena <[email protected]>
---
.../hadoop/fs/CommonConfigurationKeysPublic.java | 4 +++
.../main/java/org/apache/hadoop/ipc/Server.java | 29 +++++++++++++++++-----
.../src/main/resources/core-default.xml | 8 ++++++
.../java/org/apache/hadoop/ipc/TestServer.java | 20 +++++++++++++++
4 files changed, 55 insertions(+), 6 deletions(-)
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
index 20bb035..968de79 100644
---
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
@@ -494,6 +494,10 @@ public class CommonConfigurationKeysPublic {
"ipc.server.log.slow.rpc";
public static final boolean IPC_SERVER_LOG_SLOW_RPC_DEFAULT = false;
+ public static final String IPC_SERVER_PURGE_INTERVAL_MINUTES_KEY =
+ "ipc.server.purge.interval";
+ public static final int IPC_SERVER_PURGE_INTERVAL_MINUTES_DEFAULT = 15;
+
/**
* @see
* <a
href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 1e67203..bc51a70 100644
---
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -488,6 +488,8 @@ public abstract class Server {
volatile private boolean running = true; // true while server runs
private CallQueueManager<Call> callQueue;
+ private long purgeIntervalNanos;
+
// maintains the set of client connections and handles idle timeouts
private ConnectionManager connectionManager;
private Listener listener = null;
@@ -517,6 +519,20 @@ public abstract class Server {
this.logSlowRPC = logSlowRPCFlag;
}
+ private void setPurgeIntervalNanos(int purgeInterval) {
+ int tmpPurgeInterval = CommonConfigurationKeysPublic.
+ IPC_SERVER_PURGE_INTERVAL_MINUTES_DEFAULT;
+ if (purgeInterval > 0) {
+ tmpPurgeInterval = purgeInterval;
+ }
+ this.purgeIntervalNanos = TimeUnit.NANOSECONDS.convert(
+ tmpPurgeInterval, TimeUnit.MINUTES);
+ }
+
+ @VisibleForTesting
+ public long getPurgeIntervalNanos() {
+ return this.purgeIntervalNanos;
+ }
/**
* Logs a Slow RPC Request.
@@ -1591,9 +1607,6 @@ public abstract class Server {
}
}
- private final static long PURGE_INTERVAL_NANOS =
TimeUnit.NANOSECONDS.convert(
- 15, TimeUnit.MINUTES);
-
// Sends responses of RPC back to clients.
private class Responder extends Thread {
private final Selector writeSelector;
@@ -1629,7 +1642,7 @@ public abstract class Server {
try {
waitPending(); // If a channel is being registered, wait.
writeSelector.select(
- TimeUnit.NANOSECONDS.toMillis(PURGE_INTERVAL_NANOS));
+ TimeUnit.NANOSECONDS.toMillis(purgeIntervalNanos));
Iterator<SelectionKey> iter =
writeSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
@@ -1652,7 +1665,7 @@ public abstract class Server {
}
}
long nowNanos = Time.monotonicNowNanos();
- if (nowNanos < lastPurgeTimeNanos + PURGE_INTERVAL_NANOS) {
+ if (nowNanos < lastPurgeTimeNanos + purgeIntervalNanos) {
continue;
}
lastPurgeTimeNanos = nowNanos;
@@ -1730,7 +1743,7 @@ public abstract class Server {
Iterator<RpcCall> iter = responseQueue.listIterator(0);
while (iter.hasNext()) {
call = iter.next();
- if (now > call.responseTimestampNanos + PURGE_INTERVAL_NANOS) {
+ if (now > call.responseTimestampNanos + purgeIntervalNanos) {
closeConnection(call.connection);
break;
}
@@ -3239,6 +3252,10 @@ public abstract class Server {
CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC,
CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_DEFAULT));
+ this.setPurgeIntervalNanos(conf.getInt(
+ CommonConfigurationKeysPublic.IPC_SERVER_PURGE_INTERVAL_MINUTES_KEY,
+
CommonConfigurationKeysPublic.IPC_SERVER_PURGE_INTERVAL_MINUTES_DEFAULT));
+
// Create the responder here
responder = new Responder();
diff --git
a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 1a4a6bf..15075a4 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -2461,6 +2461,14 @@
</property>
<property>
+ <name>ipc.server.purge.interval</name>
+ <value>15</value>
+ <description>Define how often calls are cleaned up in the server.
+ The default is 15 minutes. The unit is minutes.
+ </description>
+</property>
+
+<property>
<name>ipc.maximum.data.length</name>
<value>134217728</value>
<description>This indicates the maximum IPC message length (bytes) that can
be
diff --git
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestServer.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestServer.java
index fbaa75b..748d99e 100644
---
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestServer.java
+++
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestServer.java
@@ -25,8 +25,10 @@ import java.io.IOException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.Server.Call;
@@ -184,4 +186,22 @@ public class TestServer {
assertTrue(handler.isSuppressedLog(IpcException.class));
assertFalse(handler.isSuppressedLog(RpcClientException.class));
}
+
+ @Test (timeout=300000)
+ public void testPurgeIntervalNanosConf() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setInt(CommonConfigurationKeysPublic.
+ IPC_SERVER_PURGE_INTERVAL_MINUTES_KEY, 3);
+ Server server = new Server("0.0.0.0", 0, LongWritable.class,
+ 1, conf) {
+ @Override
+ public Writable call(
+ RPC.RpcKind rpcKind, String protocol, Writable param,
+ long receiveTime) throws Exception {
+ return null;
+ }
+ };
+ long purgeInterval = TimeUnit.NANOSECONDS.convert(3, TimeUnit.MINUTES);
+ assertEquals(server.getPurgeIntervalNanos(), purgeInterval);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]