This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b1717f  RATIS-1372. Support multi-client in FileStore (#475)
9b1717f is described below

commit 9b1717fa152e7de58edee5e64ab4ff181147c3f6
Author: runzhiwang <[email protected]>
AuthorDate: Wed Apr 28 21:35:18 2021 +0800

    RATIS-1372. Support multi-client in FileStore (#475)
---
 .../ratis/examples/filestore/cli/Client.java       | 51 ++++++++++++++--------
 .../ratis/examples/filestore/cli/DataStream.java   | 18 ++++----
 .../ratis/examples/filestore/cli/LoadGen.java      | 15 ++++---
 3 files changed, 52 insertions(+), 32 deletions(-)

diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
index 009229a..77beffd 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
@@ -22,15 +22,16 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientConfigKeys;
-import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.SupportedDataStreamType;
 import org.apache.ratis.examples.common.SubCommandBase;
+import org.apache.ratis.examples.filestore.FileStoreClient;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.GrpcFactory;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -66,11 +67,14 @@ public abstract class Client extends SubCommandBase {
   @Parameter(names = {"--numFiles"}, description = "Number of files to be 
written", required = true)
   private int numFiles;
 
+  @Parameter(names = {"--numClients"}, description = "Number of clients to 
write", required = true)
+  private int numClients;
+
   @Parameter(names = {"--storage", "-s"}, description = "Storage dir, eg. 
--storage dir1 --storage dir2",
       required = true)
   private List<File> storageDir = new ArrayList<>();
 
-  private static final int MAX_THREADS_NUM = 100;
+  private static final int MAX_THREADS_NUM = 1000;
 
   public int getNumThread() {
     return numFiles < MAX_THREADS_NUM ? numFiles : MAX_THREADS_NUM;
@@ -111,27 +115,39 @@ public abstract class Client extends SubCommandBase {
         TimeDuration.valueOf(50000, TimeUnit.MILLISECONDS));
     RaftClientConfigKeys.Async.setOutstandingRequestsMax(raftProperties, 1000);
 
-
-    final RaftGroup raftGroup = 
RaftGroup.valueOf(RaftGroupId.valueOf(ByteString.copyFromUtf8(getRaftGroupId())),
-            getPeers());
-
-    RaftClient.Builder builder =
-        RaftClient.newBuilder().setProperties(raftProperties);
-    builder.setRaftGroup(raftGroup);
-    builder.setClientRpc(new GrpcFactory(new 
Parameters()).newRaftClientRpc(ClientId.randomId(), raftProperties));
-    builder.setPrimaryDataStreamServer(getPrimary());
-    RaftClient client = builder.build();
-
     for (File dir : storageDir) {
       FileUtils.createDirectories(dir);
     }
 
-    operation(client);
+    operation(getClients(raftProperties));
   }
 
+  public List<FileStoreClient> getClients(RaftProperties raftProperties) {
+    List<FileStoreClient> fileStoreClients = new ArrayList<>();
+    for (int i = 0; i < numClients; i ++) {
+      final RaftGroup raftGroup = 
RaftGroup.valueOf(RaftGroupId.valueOf(ByteString.copyFromUtf8(getRaftGroupId())),
+          getPeers());
+
+      RaftClient.Builder builder =
+          RaftClient.newBuilder().setProperties(raftProperties);
+      builder.setRaftGroup(raftGroup);
+      builder.setClientRpc(
+          new GrpcFactory(new org.apache.ratis.conf.Parameters())
+              .newRaftClientRpc(ClientId.randomId(), raftProperties));
+      RaftPeer[] peers = getPeers();
+      builder.setPrimaryDataStreamServer(peers[i % peers.length]);
+      RaftClient client = builder.build();
+      fileStoreClients.add(new FileStoreClient(client));
+    }
+    return fileStoreClients;
+  }
+
+
   @SuppressFBWarnings("DM_EXIT")
-  protected void stop(RaftClient client) throws IOException {
-    client.close();
+  protected void stop(List<FileStoreClient> clients) throws IOException {
+    for (FileStoreClient client : clients) {
+      client.close();
+    }
     System.exit(0);
   }
 
@@ -199,5 +215,6 @@ public abstract class Client extends SubCommandBase {
     return offset;
   }
 
-  protected abstract void operation(RaftClient client) throws IOException, 
ExecutionException, InterruptedException;
+  protected abstract void operation(List<FileStoreClient> clients)
+      throws IOException, ExecutionException, InterruptedException;
 }
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
index b1e2599..bb9942a 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
@@ -19,7 +19,6 @@ package org.apache.ratis.examples.filestore.cli;
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
-import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.api.DataStreamOutput;
 import org.apache.ratis.examples.filestore.FileStoreClient;
 import org.apache.ratis.io.StandardWriteOption;
@@ -114,21 +113,20 @@ public class DataStream extends Client {
   }
 
   @Override
-  protected void operation(RaftClient client) throws IOException, 
ExecutionException, InterruptedException {
+  protected void operation(List<FileStoreClient> clients) throws IOException, 
ExecutionException, InterruptedException {
     if (!checkParam()) {
-      stop(client);
+      stop(clients);
     }
 
     final ExecutorService executor = 
Executors.newFixedThreadPool(getNumThread());
     List<String> paths = generateFiles(executor);
     dropCache();
-    FileStoreClient fileStoreClient = new FileStoreClient(client);
     System.out.println("Starting DataStream write now ");
 
     RoutingTable routingTable = getRoutingTable(Arrays.asList(getPeers()), 
getPrimary());
     long startTime = System.currentTimeMillis();
 
-    long totalWrittenBytes = waitStreamFinish(streamWrite(paths, 
fileStoreClient, routingTable, executor));
+    long totalWrittenBytes = waitStreamFinish(streamWrite(paths, clients, 
routingTable, executor));
 
     long endTime = System.currentTimeMillis();
 
@@ -137,16 +135,19 @@ public class DataStream extends Client {
     System.out.println("Total data written: " + totalWrittenBytes + " bytes");
     System.out.println("Total time taken: " + (endTime - startTime) + " 
millis");
 
-    stop(client);
+    stop(clients);
   }
 
   private Map<String, 
CompletableFuture<List<CompletableFuture<DataStreamReply>>>> streamWrite(
-      List<String> paths, FileStoreClient fileStoreClient, RoutingTable 
routingTable,
+      List<String> paths, List<FileStoreClient> clients, RoutingTable 
routingTable,
       ExecutorService executor) {
     Map<String, CompletableFuture<List<CompletableFuture<DataStreamReply>>>> 
fileMap = new HashMap<>();
 
+    int clientIndex = 0;
     for(String path : paths) {
       final CompletableFuture<List<CompletableFuture<DataStreamReply>>> future 
= new CompletableFuture<>();
+      final FileStoreClient client = clients.get(clientIndex % clients.size());
+      clientIndex ++;
       CompletableFuture.supplyAsync(() -> {
         File file = new File(path);
         final long fileLength = file.length();
@@ -156,8 +157,9 @@ public class DataStream extends Client {
         final Type type = 
Optional.ofNullable(Type.valueOfIgnoreCase(dataStreamType))
             .orElseThrow(IllegalStateException::new);
         final TransferType writer = type.getConstructor().apply(path, this);
+
         try {
-          future.complete(writer.transfer(fileStoreClient, routingTable));
+          future.complete(writer.transfer(client, routingTable));
         } catch (IOException e) {
           future.completeExceptionally(e);
         }
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
index faa0487..3fd4714 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
@@ -19,7 +19,6 @@ package org.apache.ratis.examples.filestore.cli;
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
-import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.examples.filestore.FileStoreClient;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
@@ -47,16 +46,15 @@ public class LoadGen extends Client {
   private int sync = 0;
 
   @Override
-  protected void operation(RaftClient client) throws IOException, 
ExecutionException, InterruptedException {
+  protected void operation(List<FileStoreClient> clients) throws IOException, 
ExecutionException, InterruptedException {
     final ExecutorService executor = 
Executors.newFixedThreadPool(getNumThread());
     List<String> paths = generateFiles(executor);
     dropCache();
-    FileStoreClient fileStoreClient = new FileStoreClient(client);
     System.out.println("Starting Async write now ");
 
     long startTime = System.currentTimeMillis();
 
-    long totalWrittenBytes = waitWriteFinish(writeByHeapByteBuffer(paths, 
fileStoreClient, executor));
+    long totalWrittenBytes = waitWriteFinish(writeByHeapByteBuffer(paths, 
clients, executor));
 
     long endTime = System.currentTimeMillis();
 
@@ -65,7 +63,7 @@ public class LoadGen extends Client {
     System.out.println("Total data written: " + totalWrittenBytes + " bytes");
     System.out.println("Total time taken: " + (endTime - startTime) + " 
millis");
 
-    stop(client);
+    stop(clients);
   }
 
   long write(FileChannel in, long offset, FileStoreClient fileStoreClient, 
String path,
@@ -88,18 +86,21 @@ public class LoadGen extends Client {
   }
 
   private Map<String, CompletableFuture<List<CompletableFuture<Long>>>> 
writeByHeapByteBuffer(
-      List<String> paths, FileStoreClient fileStoreClient, ExecutorService 
executor) {
+      List<String> paths, List<FileStoreClient> clients, ExecutorService 
executor) {
     Map<String, CompletableFuture<List<CompletableFuture<Long>>>> fileMap = 
new HashMap<>();
 
+    int clientIndex = 0;
     for(String path : paths) {
       final CompletableFuture<List<CompletableFuture<Long>>> future = new 
CompletableFuture<>();
+      final FileStoreClient client = clients.get(clientIndex % clients.size());
+      clientIndex ++;
       CompletableFuture.supplyAsync(() -> {
         List<CompletableFuture<Long>> futures = new ArrayList<>();
         File file = new File(path);
         try (FileInputStream fis = new FileInputStream(file)) {
           final FileChannel in = fis.getChannel();
           for (long offset = 0L; offset < getFileSizeInBytes(); ) {
-            offset += write(in, offset, fileStoreClient, file.getName(), 
futures);
+            offset += write(in, offset, client, file.getName(), futures);
           }
         } catch (Throwable e) {
           future.completeExceptionally(e);

Reply via email to