[ 
https://issues.apache.org/jira/browse/HADOOP-13144?focusedWorklogId=789678&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-789678
 ]

ASF GitHub Bot logged work on HADOOP-13144:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Jul/22 17:39
            Start Date: 11/Jul/22 17:39
    Worklog Time Spent: 10m 
      Work Description: goiri commented on code in PR #4542:
URL: https://github.com/apache/hadoop/pull/4542#discussion_r918190241


##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java:
##########
@@ -390,6 +399,56 @@ public void testProxyAddress() throws Exception {
     }
   }
 
+  @Test
+  public void testConnectionWithSocketFactory() throws IOException {
+    Server server;
+    TestRpcService firstProxy = null;
+    TestRpcService secondProxy = null;
+
+    Configuration newConf = new Configuration(conf);
+    newConf.set(CommonConfigurationKeysPublic.
+        HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY, "");
+
+    RetryPolicy retryPolicy = RetryUtils.getDefaultRetryPolicy(
+        newConf, "Test.No.Such.Key",
+        true,                     // defaultRetryPolicyEnabled = true
+        "Test.No.Such.Key", "10000,6",
+        null);
+
+    // create a server with two handlers
+    server = setupTestServer(newConf, 2);
+    try {
+      // create the first client
+      firstProxy = getClient(addr, newConf);
+      // create the second client
+      secondProxy = getClient(addr, newConf);
+
+      firstProxy.ping(null, newEmptyRequest());
+      secondProxy.ping(null, newEmptyRequest());
+
+      Client client = ProtobufRpcEngine2.getClient(newConf);
+      assertEquals(1, client.getConnectionIds().size());
+
+      stop(null, firstProxy, secondProxy);
+      ProtobufRpcEngine2.clearClientCache();
+
+      // create the first client with index 1
+      firstProxy = getMultipleClientWithIndex(addr, newConf, retryPolicy, 1);
+      // create the second client with index 2
+      secondProxy = getMultipleClientWithIndex(addr, newConf, retryPolicy, 2);
+      firstProxy.ping(null, newEmptyRequest());
+      secondProxy.ping(null, newEmptyRequest());
+
+      client = ProtobufRpcEngine2.getClient(newConf);
+      assertEquals(2, client.getConnectionIds().size());
+    } catch (ServiceException e) {
+      e.printStackTrace();

Review Comment:
   We probably want to surface this.



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java:
##########
@@ -154,11 +155,53 @@ protected static TestRpcService 
getClient(InetSocketAddress serverAddr,
     }
   }
 
-  protected static void stop(Server server, TestRpcService proxy) {
-    if (proxy != null) {
-      try {
-        RPC.stopProxy(proxy);
-      } catch (Exception ignored) {}
+  /**
+   * Try to obtain a proxy of TestRpcService with an index.
+   * @param serverAddr input server address
+   * @param clientConf input client configuration
+   * @param retryPolicy input retryPolicy
+   * @param index input index
+   * @return one proxy of TestRpcService
+   */
+  protected static TestRpcService getMultipleClientWithIndex(InetSocketAddress 
serverAddr,
+      Configuration clientConf, RetryPolicy retryPolicy, int index)
+      throws ServiceException, IOException {
+    MockConnectionId connectionId = new MockConnectionId(serverAddr,
+        TestRpcService.class, UserGroupInformation.getCurrentUser(),
+        RPC.getRpcTimeout(clientConf), retryPolicy, clientConf, index);
+    return getClient(connectionId, clientConf);
+  }
+
+  /**
+   * Obtain a TestRpcService Proxy by a connectionId.
+   * @param connId input connectionId
+   * @param clientConf  input configuration
+   * @return a TestRpcService Proxy
+   * @throws ServiceException a ServiceException
+   */
+  protected static TestRpcService getClient(ConnectionId connId,
+      Configuration clientConf) throws ServiceException {
+    try {
+      return RPC.getProtocolProxy(
+          TestRpcService.class,
+          0,
+          connId,
+          clientConf,
+          NetUtils.getDefaultSocketFactory(clientConf)).getProxy();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  protected static void stop(Server server, TestRpcService... proxies) {
+    if (proxies != null) {
+      for (TestRpcService proxy : proxies) {

Review Comment:
   I believe that if proxies is null, `for (TestRpcService proxy : proxies)` 
already works fine so no need to check if it's null.
   Please double check.



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java:
##########
@@ -390,6 +399,56 @@ public void testProxyAddress() throws Exception {
     }
   }
 
+  @Test
+  public void testConnectionWithSocketFactory() throws IOException {
+    Server server;
+    TestRpcService firstProxy = null;
+    TestRpcService secondProxy = null;
+
+    Configuration newConf = new Configuration(conf);
+    newConf.set(CommonConfigurationKeysPublic.
+        HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY, "");
+
+    RetryPolicy retryPolicy = RetryUtils.getDefaultRetryPolicy(
+        newConf, "Test.No.Such.Key",
+        true,                     // defaultRetryPolicyEnabled = true
+        "Test.No.Such.Key", "10000,6",
+        null);
+
+    // create a server with two handlers
+    server = setupTestServer(newConf, 2);

Review Comment:
   We could create it here:
   ```
   Server server = setupTestServer(newConf, 2);
   ```



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java:
##########
@@ -390,6 +399,56 @@ public void testProxyAddress() throws Exception {
     }
   }
 
+  @Test
+  public void testConnectionWithSocketFactory() throws IOException {
+    Server server;
+    TestRpcService firstProxy = null;
+    TestRpcService secondProxy = null;
+
+    Configuration newConf = new Configuration(conf);
+    newConf.set(CommonConfigurationKeysPublic.
+        HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY, "");
+
+    RetryPolicy retryPolicy = RetryUtils.getDefaultRetryPolicy(
+        newConf, "Test.No.Such.Key",
+        true,                     // defaultRetryPolicyEnabled = true
+        "Test.No.Such.Key", "10000,6",
+        null);
+
+    // create a server with two handlers
+    server = setupTestServer(newConf, 2);
+    try {
+      // create the first client
+      firstProxy = getClient(addr, newConf);
+      // create the second client
+      secondProxy = getClient(addr, newConf);
+
+      firstProxy.ping(null, newEmptyRequest());
+      secondProxy.ping(null, newEmptyRequest());
+
+      Client client = ProtobufRpcEngine2.getClient(newConf);
+      assertEquals(1, client.getConnectionIds().size());
+
+      stop(null, firstProxy, secondProxy);
+      ProtobufRpcEngine2.clearClientCache();
+
+      // create the first client with index 1
+      firstProxy = getMultipleClientWithIndex(addr, newConf, retryPolicy, 1);
+      // create the second client with index 2
+      secondProxy = getMultipleClientWithIndex(addr, newConf, retryPolicy, 2);
+      firstProxy.ping(null, newEmptyRequest());
+      secondProxy.ping(null, newEmptyRequest());
+
+      client = ProtobufRpcEngine2.getClient(newConf);

Review Comment:
   For readability maybe we can do:
   ```
   Client client2 = ProtobufRpcEngine2.getClient(newConf);
   assertEquals(2, client2.getConnectionIds().size());
   ```



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java:
##########
@@ -189,6 +232,36 @@ protected static int countThreads(String search) {
     return count;
   }
 
+  public static class MockConnectionId extends ConnectionId {
+    private static final int PRIME = 16777619;
+    private final int index;
+
+    public MockConnectionId(InetSocketAddress address, Class<?> protocol,
+        UserGroupInformation ticket, int rpcTimeout, RetryPolicy 
connectionRetryPolicy,
+        Configuration conf, int index) {
+      super(address, protocol, ticket, rpcTimeout, connectionRetryPolicy, 
conf);
+      this.index = index;
+    }
+
+    @Override
+    public int hashCode() {

Review Comment:
   Let's use:
   org.apache.commons.lang3.builder.EqualsBuilder
   org.apache.commons.lang3.builder.HashCodeBuilder
   





Issue Time Tracking
-------------------

    Worklog Id:     (was: 789678)
    Time Spent: 40m  (was: 0.5h)

> Enhancing IPC client throughput via multiple connections per user
> -----------------------------------------------------------------
>
>                 Key: HADOOP-13144
>                 URL: https://issues.apache.org/jira/browse/HADOOP-13144
>             Project: Hadoop Common
>          Issue Type: Improvement
>          Components: ipc
>            Reporter: Jason Kace
>            Assignee: Íñigo Goiri
>            Priority: Minor
>              Labels: pull-request-available
>         Attachments: HADOOP-13144-performance.patch, HADOOP-13144.000.patch, 
> HADOOP-13144.001.patch, HADOOP-13144.002.patch, HADOOP-13144.003.patch, 
> HADOOP-13144_overload_enhancement.patch
>
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> The generic IPC client ({{org.apache.hadoop.ipc.Client}}) utilizes a single 
> connection thread for each {{ConnectionId}}.  The {{ConnectionId}} is unique 
> to the connection's remote address, ticket and protocol.  Each ConnectionId 
> is 1:1 mapped to a connection thread by the client via a map cache.
> The result is to serialize all IPC read/write activity through a single 
> thread for a each user/ticket + address.  If a single user makes repeated 
> calls (1k-100k/sec) to the same destination, the IPC client becomes a 
> bottleneck.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to