This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new b6c9888 RATIS-1253. FileStore support routing table (#368)
b6c9888 is described below
commit b6c9888b1c828029cf39cb48dd2d100b17668fcc
Author: runzhiwang <[email protected]>
AuthorDate: Tue Dec 22 18:29:57 2020 +0800
RATIS-1253. FileStore support routing table (#368)
---
.../ratis/examples/common/SubCommandBase.java | 16 +++++++++++++++
.../ratis/examples/filestore/FileStoreClient.java | 5 +++--
.../ratis/examples/filestore/cli/DataStream.java | 14 ++++++++-----
.../filestore/FileStoreStreamingBaseTest.java | 24 +++++++++++++---------
.../ratis/examples/filestore/FileStoreWriter.java | 5 +++--
5 files changed, 45 insertions(+), 19 deletions(-)
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java
index f168e79..2cf3520 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java
@@ -20,7 +20,9 @@ package org.apache.ratis.examples.common;
import com.beust.jcommander.Parameter;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RoutingTable;
+import java.util.Collection;
import java.util.Objects;
import java.util.stream.Stream;
@@ -60,6 +62,20 @@ public abstract class SubCommandBase {
return raftGroupId;
}
+ public RoutingTable getRoutingTable(Collection<RaftPeer> raftPeers, RaftPeer
primary) {
+ RoutingTable.Builder builder = RoutingTable.newBuilder();
+ RaftPeer previous = primary;
+ for (RaftPeer peer : raftPeers) {
+ if (peer.equals(primary)) {
+ continue;
+ }
+ builder.addSuccessor(previous.getId(), peer.getId());
+ previous = peer;
+ }
+
+ return builder.build();
+ }
+
/**
* @return the peer with the given id if it is in this group; otherwise,
return null.
*/
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
index d6e6851..9e91602 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
@@ -33,6 +33,7 @@ import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RoutingTable;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.JavaUtils;
@@ -153,13 +154,13 @@ public class FileStoreClient implements Closeable {
return WriteReplyProto.parseFrom(reply).getLength();
}
- public DataStreamOutput getStreamOutput(String path, long dataSize) {
+ public DataStreamOutput getStreamOutput(String path, long dataSize,
RoutingTable routingTable) {
final StreamWriteRequestProto header = StreamWriteRequestProto.newBuilder()
.setPath(ProtoUtils.toByteString(path))
.setLength(dataSize)
.build();
final FileStoreRequestProto request =
FileStoreRequestProto.newBuilder().setStream(header).build();
- return
client.getDataStreamApi().stream(request.toByteString().asReadOnlyByteBuffer());
+ return
client.getDataStreamApi().stream(request.toByteString().asReadOnlyByteBuffer(),
routingTable);
}
public CompletableFuture<Long> writeAsync(String path, long offset, boolean
close, ByteBuffer buffer, boolean sync) {
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
index c7443f6..b1e2599 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
@@ -24,6 +24,7 @@ import org.apache.ratis.client.api.DataStreamOutput;
import org.apache.ratis.examples.filestore.FileStoreClient;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RoutingTable;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
import org.apache.ratis.util.JavaUtils;
@@ -124,9 +125,10 @@ public class DataStream extends Client {
FileStoreClient fileStoreClient = new FileStoreClient(client);
System.out.println("Starting DataStream write now ");
+ RoutingTable routingTable = getRoutingTable(Arrays.asList(getPeers()),
getPrimary());
long startTime = System.currentTimeMillis();
- long totalWrittenBytes = waitStreamFinish(streamWrite(paths,
fileStoreClient, executor));
+ long totalWrittenBytes = waitStreamFinish(streamWrite(paths,
fileStoreClient, routingTable, executor));
long endTime = System.currentTimeMillis();
@@ -139,7 +141,8 @@ public class DataStream extends Client {
}
private Map<String,
CompletableFuture<List<CompletableFuture<DataStreamReply>>>> streamWrite(
- List<String> paths, FileStoreClient fileStoreClient, ExecutorService
executor) {
+ List<String> paths, FileStoreClient fileStoreClient, RoutingTable
routingTable,
+ ExecutorService executor) {
Map<String, CompletableFuture<List<CompletableFuture<DataStreamReply>>>>
fileMap = new HashMap<>();
for(String path : paths) {
@@ -154,7 +157,7 @@ public class DataStream extends Client {
.orElseThrow(IllegalStateException::new);
final TransferType writer = type.getConstructor().apply(path, this);
try {
- future.complete(writer.transfer(fileStoreClient));
+ future.complete(writer.transfer(fileStoreClient, routingTable));
} catch (IOException e) {
future.completeExceptionally(e);
}
@@ -225,13 +228,14 @@ public class DataStream extends Client {
return false;
}
- List<CompletableFuture<DataStreamReply>> transfer(FileStoreClient client)
throws IOException {
+ List<CompletableFuture<DataStreamReply>> transfer(
+ FileStoreClient client, RoutingTable routingTable) throws IOException {
if (fileSize <= 0) {
return Collections.emptyList();
}
final List<CompletableFuture<DataStreamReply>> futures = new
ArrayList<>();
- final DataStreamOutput out = client.getStreamOutput(file.getName(),
fileSize);
+ final DataStreamOutput out = client.getStreamOutput(file.getName(),
fileSize, routingTable);
try (FileInputStream fis = new FileInputStream(file)) {
final FileChannel in = fis.getChannel();
for (long offset = 0L; offset < fileSize; ) {
diff --git
a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
index 59a83c5..bd10d28 100644
---
a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
+++
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
@@ -23,6 +23,7 @@ import org.apache.ratis.conf.ConfUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RoutingTable;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.LogUtils;
@@ -72,9 +73,10 @@ public abstract class FileStoreStreamingBaseTest <CLUSTER
extends MiniRaftCluste
final CheckedSupplier<FileStoreClient, IOException> newClient =
() -> new FileStoreClient(cluster.getGroup(), getProperties(),
primary);
- testSingleFile("foo", SizeInBytes.valueOf("2M"), 10_000, newClient);
- testSingleFile("bar", SizeInBytes.valueOf("2M"), 1000, newClient);
- testSingleFile("sar", SizeInBytes.valueOf("20M"), 100_000, newClient);
+ RoutingTable routingTable = getRoutingTable(peers, primary);
+ testSingleFile("foo", SizeInBytes.valueOf("2M"), 10_000, newClient,
routingTable);
+ testSingleFile("bar", SizeInBytes.valueOf("2M"), 1000, newClient,
routingTable);
+ testSingleFile("sar", SizeInBytes.valueOf("20M"), 100_000, newClient,
routingTable);
cluster.shutdown();
}
@@ -93,15 +95,16 @@ public abstract class FileStoreStreamingBaseTest <CLUSTER
extends MiniRaftCluste
final CheckedSupplier<FileStoreClient, IOException> newClient =
() -> new FileStoreClient(cluster.getGroup(), getProperties(),
primary);
- testMultipleFiles("foo", 5, SizeInBytes.valueOf("2M"), 10_000, newClient);
- testMultipleFiles("bar", 10, SizeInBytes.valueOf("2M"), 1000, newClient);
+ RoutingTable routingTable = getRoutingTable(peers, primary);
+ testMultipleFiles("foo", 5, SizeInBytes.valueOf("2M"), 10_000, newClient,
routingTable);
+ testMultipleFiles("bar", 10, SizeInBytes.valueOf("2M"), 1000, newClient,
routingTable);
cluster.shutdown();
}
private void testSingleFile(
- String path, SizeInBytes fileLength, int bufferSize,
CheckedSupplier<FileStoreClient,
- IOException> newClient)
+ String path, SizeInBytes fileLength, int bufferSize,
CheckedSupplier<FileStoreClient, IOException> newClient,
+ RoutingTable routingTable)
throws Exception {
LOG.info("runTestSingleFile with path={}, fileLength={}", path,
fileLength);
FileStoreWriter.newBuilder()
@@ -109,11 +112,12 @@ public abstract class FileStoreStreamingBaseTest <CLUSTER
extends MiniRaftCluste
.setFileSize(fileLength)
.setBufferSize(bufferSize)
.setFileStoreClientSupplier(newClient)
- .build().streamWriteAndVerify();
+ .build().streamWriteAndVerify(routingTable);
}
private void testMultipleFiles(String pathBase, int numFile, SizeInBytes
fileLength,
- int bufferSize, CheckedSupplier<FileStoreClient, IOException> newClient)
throws Exception {
+ int bufferSize, CheckedSupplier<FileStoreClient, IOException> newClient,
+ RoutingTable routingTable) throws Exception {
final ExecutorService executor = Executors.newFixedThreadPool(numFile);
final List<Future<FileStoreWriter>> writerFutures = new ArrayList<>();
@@ -125,7 +129,7 @@ public abstract class FileStoreStreamingBaseTest <CLUSTER
extends MiniRaftCluste
.setFileSize(fileLength)
.setBufferSize(bufferSize)
.setFileStoreClientSupplier(newClient)
- .build().streamWriteAndVerify(),
+ .build().streamWriteAndVerify(routingTable),
() -> path);
writerFutures.add(executor.submit(callable));
}
diff --git
a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
index 530a618..ac869d4 100644
---
a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
+++
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
@@ -22,6 +22,7 @@ import org.apache.ratis.datastream.DataStreamTestUtils;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RoutingTable;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.io.netty.util.internal.ThreadLocalRandom;
import org.apache.ratis.util.Preconditions;
@@ -134,9 +135,9 @@ class FileStoreWriter implements Closeable {
return this;
}
- public FileStoreWriter streamWriteAndVerify() {
+ public FileStoreWriter streamWriteAndVerify(RoutingTable routingTable) {
final int size = fileSize.getSizeInt();
- final DataStreamOutput dataStreamOutput = client.getStreamOutput(fileName,
size);
+ final DataStreamOutput dataStreamOutput = client.getStreamOutput(fileName,
size, routingTable);
final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
final List<Integer> sizes = new ArrayList<>();