This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 9b1717f RATIS-1372. Support multi-client in FileStore (#475)
9b1717f is described below
commit 9b1717fa152e7de58edee5e64ab4ff181147c3f6
Author: runzhiwang <[email protected]>
AuthorDate: Wed Apr 28 21:35:18 2021 +0800
RATIS-1372. Support multi-client in FileStore (#475)
---
.../ratis/examples/filestore/cli/Client.java | 51 ++++++++++++++--------
.../ratis/examples/filestore/cli/DataStream.java | 18 ++++----
.../ratis/examples/filestore/cli/LoadGen.java | 15 ++++---
3 files changed, 52 insertions(+), 32 deletions(-)
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
index 009229a..77beffd 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
@@ -22,15 +22,16 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
-import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.SupportedDataStreamType;
import org.apache.ratis.examples.common.SubCommandBase;
+import org.apache.ratis.examples.filestore.FileStoreClient;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcFactory;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -66,11 +67,14 @@ public abstract class Client extends SubCommandBase {
@Parameter(names = {"--numFiles"}, description = "Number of files to be
written", required = true)
private int numFiles;
+ @Parameter(names = {"--numClients"}, description = "Number of clients to
write", required = true)
+ private int numClients;
+
@Parameter(names = {"--storage", "-s"}, description = "Storage dir, eg.
--storage dir1 --storage dir2",
required = true)
private List<File> storageDir = new ArrayList<>();
- private static final int MAX_THREADS_NUM = 100;
+ private static final int MAX_THREADS_NUM = 1000;
public int getNumThread() {
return numFiles < MAX_THREADS_NUM ? numFiles : MAX_THREADS_NUM;
@@ -111,27 +115,39 @@ public abstract class Client extends SubCommandBase {
TimeDuration.valueOf(50000, TimeUnit.MILLISECONDS));
RaftClientConfigKeys.Async.setOutstandingRequestsMax(raftProperties, 1000);
-
- final RaftGroup raftGroup =
RaftGroup.valueOf(RaftGroupId.valueOf(ByteString.copyFromUtf8(getRaftGroupId())),
- getPeers());
-
- RaftClient.Builder builder =
- RaftClient.newBuilder().setProperties(raftProperties);
- builder.setRaftGroup(raftGroup);
- builder.setClientRpc(new GrpcFactory(new
Parameters()).newRaftClientRpc(ClientId.randomId(), raftProperties));
- builder.setPrimaryDataStreamServer(getPrimary());
- RaftClient client = builder.build();
-
for (File dir : storageDir) {
FileUtils.createDirectories(dir);
}
- operation(client);
+ operation(getClients(raftProperties));
}
+ public List<FileStoreClient> getClients(RaftProperties raftProperties) {
+ List<FileStoreClient> fileStoreClients = new ArrayList<>();
+ for (int i = 0; i < numClients; i ++) {
+ final RaftGroup raftGroup =
RaftGroup.valueOf(RaftGroupId.valueOf(ByteString.copyFromUtf8(getRaftGroupId())),
+ getPeers());
+
+ RaftClient.Builder builder =
+ RaftClient.newBuilder().setProperties(raftProperties);
+ builder.setRaftGroup(raftGroup);
+ builder.setClientRpc(
+ new GrpcFactory(new org.apache.ratis.conf.Parameters())
+ .newRaftClientRpc(ClientId.randomId(), raftProperties));
+ RaftPeer[] peers = getPeers();
+ builder.setPrimaryDataStreamServer(peers[i % peers.length]);
+ RaftClient client = builder.build();
+ fileStoreClients.add(new FileStoreClient(client));
+ }
+ return fileStoreClients;
+ }
+
+
@SuppressFBWarnings("DM_EXIT")
- protected void stop(RaftClient client) throws IOException {
- client.close();
+ protected void stop(List<FileStoreClient> clients) throws IOException {
+ for (FileStoreClient client : clients) {
+ client.close();
+ }
System.exit(0);
}
@@ -199,5 +215,6 @@ public abstract class Client extends SubCommandBase {
return offset;
}
- protected abstract void operation(RaftClient client) throws IOException,
ExecutionException, InterruptedException;
+ protected abstract void operation(List<FileStoreClient> clients)
+ throws IOException, ExecutionException, InterruptedException;
}
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
index b1e2599..bb9942a 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
@@ -19,7 +19,6 @@ package org.apache.ratis.examples.filestore.cli;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
-import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.api.DataStreamOutput;
import org.apache.ratis.examples.filestore.FileStoreClient;
import org.apache.ratis.io.StandardWriteOption;
@@ -114,21 +113,20 @@ public class DataStream extends Client {
}
@Override
- protected void operation(RaftClient client) throws IOException,
ExecutionException, InterruptedException {
+ protected void operation(List<FileStoreClient> clients) throws IOException,
ExecutionException, InterruptedException {
if (!checkParam()) {
- stop(client);
+ stop(clients);
}
final ExecutorService executor =
Executors.newFixedThreadPool(getNumThread());
List<String> paths = generateFiles(executor);
dropCache();
- FileStoreClient fileStoreClient = new FileStoreClient(client);
System.out.println("Starting DataStream write now ");
RoutingTable routingTable = getRoutingTable(Arrays.asList(getPeers()),
getPrimary());
long startTime = System.currentTimeMillis();
- long totalWrittenBytes = waitStreamFinish(streamWrite(paths,
fileStoreClient, routingTable, executor));
+ long totalWrittenBytes = waitStreamFinish(streamWrite(paths, clients,
routingTable, executor));
long endTime = System.currentTimeMillis();
@@ -137,16 +135,19 @@ public class DataStream extends Client {
System.out.println("Total data written: " + totalWrittenBytes + " bytes");
System.out.println("Total time taken: " + (endTime - startTime) + "
millis");
- stop(client);
+ stop(clients);
}
private Map<String,
CompletableFuture<List<CompletableFuture<DataStreamReply>>>> streamWrite(
- List<String> paths, FileStoreClient fileStoreClient, RoutingTable
routingTable,
+ List<String> paths, List<FileStoreClient> clients, RoutingTable
routingTable,
ExecutorService executor) {
Map<String, CompletableFuture<List<CompletableFuture<DataStreamReply>>>>
fileMap = new HashMap<>();
+ int clientIndex = 0;
for(String path : paths) {
final CompletableFuture<List<CompletableFuture<DataStreamReply>>> future
= new CompletableFuture<>();
+ final FileStoreClient client = clients.get(clientIndex % clients.size());
+ clientIndex ++;
CompletableFuture.supplyAsync(() -> {
File file = new File(path);
final long fileLength = file.length();
@@ -156,8 +157,9 @@ public class DataStream extends Client {
final Type type =
Optional.ofNullable(Type.valueOfIgnoreCase(dataStreamType))
.orElseThrow(IllegalStateException::new);
final TransferType writer = type.getConstructor().apply(path, this);
+
try {
- future.complete(writer.transfer(fileStoreClient, routingTable));
+ future.complete(writer.transfer(client, routingTable));
} catch (IOException e) {
future.completeExceptionally(e);
}
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
index faa0487..3fd4714 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
@@ -19,7 +19,6 @@ package org.apache.ratis.examples.filestore.cli;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
-import org.apache.ratis.client.RaftClient;
import org.apache.ratis.examples.filestore.FileStoreClient;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
@@ -47,16 +46,15 @@ public class LoadGen extends Client {
private int sync = 0;
@Override
- protected void operation(RaftClient client) throws IOException,
ExecutionException, InterruptedException {
+ protected void operation(List<FileStoreClient> clients) throws IOException,
ExecutionException, InterruptedException {
final ExecutorService executor =
Executors.newFixedThreadPool(getNumThread());
List<String> paths = generateFiles(executor);
dropCache();
- FileStoreClient fileStoreClient = new FileStoreClient(client);
System.out.println("Starting Async write now ");
long startTime = System.currentTimeMillis();
- long totalWrittenBytes = waitWriteFinish(writeByHeapByteBuffer(paths,
fileStoreClient, executor));
+ long totalWrittenBytes = waitWriteFinish(writeByHeapByteBuffer(paths,
clients, executor));
long endTime = System.currentTimeMillis();
@@ -65,7 +63,7 @@ public class LoadGen extends Client {
System.out.println("Total data written: " + totalWrittenBytes + " bytes");
System.out.println("Total time taken: " + (endTime - startTime) + "
millis");
- stop(client);
+ stop(clients);
}
long write(FileChannel in, long offset, FileStoreClient fileStoreClient,
String path,
@@ -88,18 +86,21 @@ public class LoadGen extends Client {
}
private Map<String, CompletableFuture<List<CompletableFuture<Long>>>>
writeByHeapByteBuffer(
- List<String> paths, FileStoreClient fileStoreClient, ExecutorService
executor) {
+ List<String> paths, List<FileStoreClient> clients, ExecutorService
executor) {
Map<String, CompletableFuture<List<CompletableFuture<Long>>>> fileMap =
new HashMap<>();
+ int clientIndex = 0;
for(String path : paths) {
final CompletableFuture<List<CompletableFuture<Long>>> future = new
CompletableFuture<>();
+ final FileStoreClient client = clients.get(clientIndex % clients.size());
+ clientIndex ++;
CompletableFuture.supplyAsync(() -> {
List<CompletableFuture<Long>> futures = new ArrayList<>();
File file = new File(path);
try (FileInputStream fis = new FileInputStream(file)) {
final FileChannel in = fis.getChannel();
for (long offset = 0L; offset < getFileSizeInBytes(); ) {
- offset += write(in, offset, fileStoreClient, file.getName(),
futures);
+ offset += write(in, offset, client, file.getName(), futures);
}
} catch (Throwable e) {
future.completeExceptionally(e);