[
https://issues.apache.org/jira/browse/HADOOP-13144?focusedWorklogId=789678&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-789678
]
ASF GitHub Bot logged work on HADOOP-13144:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 11/Jul/22 17:39
Start Date: 11/Jul/22 17:39
Worklog Time Spent: 10m
Work Description: goiri commented on code in PR #4542:
URL: https://github.com/apache/hadoop/pull/4542#discussion_r918190241
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java:
##########
@@ -390,6 +399,56 @@ public void testProxyAddress() throws Exception {
}
}
+ @Test
+ public void testConnectionWithSocketFactory() throws IOException {
+ Server server;
+ TestRpcService firstProxy = null;
+ TestRpcService secondProxy = null;
+
+ Configuration newConf = new Configuration(conf);
+ newConf.set(CommonConfigurationKeysPublic.
+ HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY, "");
+
+ RetryPolicy retryPolicy = RetryUtils.getDefaultRetryPolicy(
+ newConf, "Test.No.Such.Key",
+ true, // defaultRetryPolicyEnabled = true
+ "Test.No.Such.Key", "10000,6",
+ null);
+
+ // create a server with two handlers
+ server = setupTestServer(newConf, 2);
+ try {
+ // create the first client
+ firstProxy = getClient(addr, newConf);
+ // create the second client
+ secondProxy = getClient(addr, newConf);
+
+ firstProxy.ping(null, newEmptyRequest());
+ secondProxy.ping(null, newEmptyRequest());
+
+ Client client = ProtobufRpcEngine2.getClient(newConf);
+ assertEquals(1, client.getConnectionIds().size());
+
+ stop(null, firstProxy, secondProxy);
+ ProtobufRpcEngine2.clearClientCache();
+
+ // create the first client with index 1
+ firstProxy = getMultipleClientWithIndex(addr, newConf, retryPolicy, 1);
+ // create the second client with index 2
+ secondProxy = getMultipleClientWithIndex(addr, newConf, retryPolicy, 2);
+ firstProxy.ping(null, newEmptyRequest());
+ secondProxy.ping(null, newEmptyRequest());
+
+ client = ProtobufRpcEngine2.getClient(newConf);
+ assertEquals(2, client.getConnectionIds().size());
+ } catch (ServiceException e) {
+ e.printStackTrace();
Review Comment:
We probably want to surface this.
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java:
##########
@@ -154,11 +155,53 @@ protected static TestRpcService
getClient(InetSocketAddress serverAddr,
}
}
- protected static void stop(Server server, TestRpcService proxy) {
- if (proxy != null) {
- try {
- RPC.stopProxy(proxy);
- } catch (Exception ignored) {}
+ /**
+ * Try to obtain a proxy of TestRpcService with an index.
+ * @param serverAddr input server address
+ * @param clientConf input client configuration
+ * @param retryPolicy input retryPolicy
+ * @param index input index
+ * @return one proxy of TestRpcService
+ */
+ protected static TestRpcService getMultipleClientWithIndex(InetSocketAddress
serverAddr,
+ Configuration clientConf, RetryPolicy retryPolicy, int index)
+ throws ServiceException, IOException {
+ MockConnectionId connectionId = new MockConnectionId(serverAddr,
+ TestRpcService.class, UserGroupInformation.getCurrentUser(),
+ RPC.getRpcTimeout(clientConf), retryPolicy, clientConf, index);
+ return getClient(connectionId, clientConf);
+ }
+
+ /**
+ * Obtain a TestRpcService Proxy by a connectionId.
+ * @param connId input connectionId
+ * @param clientConf input configuration
+ * @return a TestRpcService Proxy
+ * @throws ServiceException a ServiceException
+ */
+ protected static TestRpcService getClient(ConnectionId connId,
+ Configuration clientConf) throws ServiceException {
+ try {
+ return RPC.getProtocolProxy(
+ TestRpcService.class,
+ 0,
+ connId,
+ clientConf,
+ NetUtils.getDefaultSocketFactory(clientConf)).getProxy();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ protected static void stop(Server server, TestRpcService... proxies) {
+ if (proxies != null) {
+ for (TestRpcService proxy : proxies) {
Review Comment:
I believe that if proxies is null, `for (TestRpcService proxy : proxies)`
already works fine so no need to check if it's null.
Please double check.
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java:
##########
@@ -390,6 +399,56 @@ public void testProxyAddress() throws Exception {
}
}
+ @Test
+ public void testConnectionWithSocketFactory() throws IOException {
+ Server server;
+ TestRpcService firstProxy = null;
+ TestRpcService secondProxy = null;
+
+ Configuration newConf = new Configuration(conf);
+ newConf.set(CommonConfigurationKeysPublic.
+ HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY, "");
+
+ RetryPolicy retryPolicy = RetryUtils.getDefaultRetryPolicy(
+ newConf, "Test.No.Such.Key",
+ true, // defaultRetryPolicyEnabled = true
+ "Test.No.Such.Key", "10000,6",
+ null);
+
+ // create a server with two handlers
+ server = setupTestServer(newConf, 2);
Review Comment:
We could create it here:
```
Server server = setupTestServer(newConf, 2);
```
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java:
##########
@@ -390,6 +399,56 @@ public void testProxyAddress() throws Exception {
}
}
+ @Test
+ public void testConnectionWithSocketFactory() throws IOException {
+ Server server;
+ TestRpcService firstProxy = null;
+ TestRpcService secondProxy = null;
+
+ Configuration newConf = new Configuration(conf);
+ newConf.set(CommonConfigurationKeysPublic.
+ HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY, "");
+
+ RetryPolicy retryPolicy = RetryUtils.getDefaultRetryPolicy(
+ newConf, "Test.No.Such.Key",
+ true, // defaultRetryPolicyEnabled = true
+ "Test.No.Such.Key", "10000,6",
+ null);
+
+ // create a server with two handlers
+ server = setupTestServer(newConf, 2);
+ try {
+ // create the first client
+ firstProxy = getClient(addr, newConf);
+ // create the second client
+ secondProxy = getClient(addr, newConf);
+
+ firstProxy.ping(null, newEmptyRequest());
+ secondProxy.ping(null, newEmptyRequest());
+
+ Client client = ProtobufRpcEngine2.getClient(newConf);
+ assertEquals(1, client.getConnectionIds().size());
+
+ stop(null, firstProxy, secondProxy);
+ ProtobufRpcEngine2.clearClientCache();
+
+ // create the first client with index 1
+ firstProxy = getMultipleClientWithIndex(addr, newConf, retryPolicy, 1);
+ // create the second client with index 2
+ secondProxy = getMultipleClientWithIndex(addr, newConf, retryPolicy, 2);
+ firstProxy.ping(null, newEmptyRequest());
+ secondProxy.ping(null, newEmptyRequest());
+
+ client = ProtobufRpcEngine2.getClient(newConf);
Review Comment:
For readability maybe we can do:
```
Client client2 = ProtobufRpcEngine2.getClient(newConf);
assertEquals(2, client2.getConnectionIds().size());
```
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java:
##########
@@ -189,6 +232,36 @@ protected static int countThreads(String search) {
return count;
}
+ public static class MockConnectionId extends ConnectionId {
+ private static final int PRIME = 16777619;
+ private final int index;
+
+ public MockConnectionId(InetSocketAddress address, Class<?> protocol,
+ UserGroupInformation ticket, int rpcTimeout, RetryPolicy
connectionRetryPolicy,
+ Configuration conf, int index) {
+ super(address, protocol, ticket, rpcTimeout, connectionRetryPolicy,
conf);
+ this.index = index;
+ }
+
+ @Override
+ public int hashCode() {
Review Comment:
Let's use:
org.apache.commons.lang3.builder.EqualsBuilder
org.apache.commons.lang3.builder.HashCodeBuilder
Issue Time Tracking
-------------------
Worklog Id: (was: 789678)
Time Spent: 40m (was: 0.5h)
> Enhancing IPC client throughput via multiple connections per user
> -----------------------------------------------------------------
>
> Key: HADOOP-13144
> URL: https://issues.apache.org/jira/browse/HADOOP-13144
> Project: Hadoop Common
> Issue Type: Improvement
> Components: ipc
> Reporter: Jason Kace
> Assignee: Íñigo Goiri
> Priority: Minor
> Labels: pull-request-available
> Attachments: HADOOP-13144-performance.patch, HADOOP-13144.000.patch,
> HADOOP-13144.001.patch, HADOOP-13144.002.patch, HADOOP-13144.003.patch,
> HADOOP-13144_overload_enhancement.patch
>
> Time Spent: 40m
> Remaining Estimate: 0h
>
> The generic IPC client ({{org.apache.hadoop.ipc.Client}}) utilizes a single
> connection thread for each {{ConnectionId}}. The {{ConnectionId}} is unique
> to the connection's remote address, ticket and protocol. Each ConnectionId
> is 1:1 mapped to a connection thread by the client via a map cache.
> The result is to serialize all IPC read/write activity through a single
> thread for a each user/ticket + address. If a single user makes repeated
> calls (1k-100k/sec) to the same destination, the IPC client becomes a
> bottleneck.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]