bharatviswa504 commented on a change in pull request #714: HDDS-1406. Avoid 
usage of commonPool in RatisPipelineUtils.
URL: https://github.com/apache/hadoop/pull/714#discussion_r285837951
 
 

 ##########
 File path: 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
 ##########
 @@ -133,7 +173,86 @@ public Pipeline create(ReplicationFactor factor,
         .build();
   }
 
+
+  @Override
+  public void shutdown() {
+    forkJoinPool.shutdownNow();
+  }
+
   protected void initializePipeline(Pipeline pipeline) throws IOException {
-    RatisPipelineUtils.createPipeline(pipeline, conf);
+    createPipeline(pipeline);
+  }
+
+  /**
+   * Sends ratis command to create pipeline on all the datanodes.
+   *
+   * @param pipeline  - Pipeline to be created
+   * @throws IOException if creation fails
+   */
+  public void createPipeline(Pipeline pipeline)
+      throws IOException {
+    final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
+    LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group);
+    callRatisRpc(pipeline.getNodes(),
+        (raftClient, peer) -> {
+          RaftClientReply reply = raftClient.groupAdd(group, peer.getId());
+          if (reply == null || !reply.isSuccess()) {
+            String msg = "Pipeline initialization failed for pipeline:"
+                + pipeline.getId() + " node:" + peer.getId();
+            LOG.error(msg);
+            throw new IOException(msg);
+          }
+        });
+  }
+
+  private void callRatisRpc(List<DatanodeDetails> datanodes,
+      CheckedBiConsumer< RaftClient, RaftPeer, IOException> rpc)
+      throws IOException {
+    if (datanodes.isEmpty()) {
+      return;
+    }
+
+    final String rpcType = conf
+        .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
+            ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
+    final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf);
+    final List< IOException > exceptions =
+        Collections.synchronizedList(new ArrayList<>());
+    final int maxOutstandingRequests =
+        HddsClientUtils.getMaxOutstandingRequests(conf);
+    final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
+        SecurityConfig(conf));
+    final TimeDuration requestTimeout =
+        RatisHelper.getClientRequestTimeout(conf);
+    try {
+      forkJoinPool.submit(() -> {
 
 Review comment:
   @lokeshj1703 Sorry missed this comment earlier.
   Checked this, one of the forkJoinPool thread is used for waiting and the 
same is being used in one of the calls for Ratis with 3 pipeline.
   
   **Output:**
   The below line is from after Submit.
   Thread name RATISCREATEPIPELINE1
   `      forkJoinPool.submit(() -> {`
   These below log lines are inside ParallelStream
   `        datanodes.parallelStream().forEach(d -> {`
   Internal thread name RATISCREATEPIPELINE1
   Internal thread name RATISCREATEPIPELINE3
   Internal thread name RATISCREATEPIPELINE2
   
   So, I think we should be fine with parallelism set to 3. I even tried with 
4, but I still see the same above output.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to