This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new b6c9888  RATIS-1253. FileStore support routing table (#368)
b6c9888 is described below

commit b6c9888b1c828029cf39cb48dd2d100b17668fcc
Author: runzhiwang <[email protected]>
AuthorDate: Tue Dec 22 18:29:57 2020 +0800

    RATIS-1253. FileStore support routing table (#368)
---
 .../ratis/examples/common/SubCommandBase.java      | 16 +++++++++++++++
 .../ratis/examples/filestore/FileStoreClient.java  |  5 +++--
 .../ratis/examples/filestore/cli/DataStream.java   | 14 ++++++++-----
 .../filestore/FileStoreStreamingBaseTest.java      | 24 +++++++++++++---------
 .../ratis/examples/filestore/FileStoreWriter.java  |  5 +++--
 5 files changed, 45 insertions(+), 19 deletions(-)

diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java
index f168e79..2cf3520 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java
@@ -20,7 +20,9 @@ package org.apache.ratis.examples.common;
 import com.beust.jcommander.Parameter;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RoutingTable;
 
+import java.util.Collection;
 import java.util.Objects;
 import java.util.stream.Stream;
 
@@ -60,6 +62,20 @@ public abstract class SubCommandBase {
     return raftGroupId;
   }
 
+  public RoutingTable getRoutingTable(Collection<RaftPeer> raftPeers, RaftPeer 
primary) {
+    RoutingTable.Builder builder = RoutingTable.newBuilder();
+    RaftPeer previous = primary;
+    for (RaftPeer peer : raftPeers) {
+      if (peer.equals(primary)) {
+        continue;
+      }
+      builder.addSuccessor(previous.getId(), peer.getId());
+      previous = peer;
+    }
+
+    return builder.build();
+  }
+
   /**
    * @return the peer with the given id if it is in this group; otherwise, 
return null.
    */
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
index d6e6851..9e91602 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
@@ -33,6 +33,7 @@ import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RoutingTable;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.JavaUtils;
@@ -153,13 +154,13 @@ public class FileStoreClient implements Closeable {
     return WriteReplyProto.parseFrom(reply).getLength();
   }
 
-  public DataStreamOutput getStreamOutput(String path, long dataSize) {
+  public DataStreamOutput getStreamOutput(String path, long dataSize, 
RoutingTable routingTable) {
     final StreamWriteRequestProto header = StreamWriteRequestProto.newBuilder()
         .setPath(ProtoUtils.toByteString(path))
         .setLength(dataSize)
         .build();
     final FileStoreRequestProto request = 
FileStoreRequestProto.newBuilder().setStream(header).build();
-    return 
client.getDataStreamApi().stream(request.toByteString().asReadOnlyByteBuffer());
+    return 
client.getDataStreamApi().stream(request.toByteString().asReadOnlyByteBuffer(), 
routingTable);
   }
 
   public CompletableFuture<Long> writeAsync(String path, long offset, boolean 
close, ByteBuffer buffer, boolean sync) {
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
index c7443f6..b1e2599 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
@@ -24,6 +24,7 @@ import org.apache.ratis.client.api.DataStreamOutput;
 import org.apache.ratis.examples.filestore.FileStoreClient;
 import org.apache.ratis.io.StandardWriteOption;
 import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RoutingTable;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
 import org.apache.ratis.util.JavaUtils;
@@ -124,9 +125,10 @@ public class DataStream extends Client {
     FileStoreClient fileStoreClient = new FileStoreClient(client);
     System.out.println("Starting DataStream write now ");
 
+    RoutingTable routingTable = getRoutingTable(Arrays.asList(getPeers()), 
getPrimary());
     long startTime = System.currentTimeMillis();
 
-    long totalWrittenBytes = waitStreamFinish(streamWrite(paths, 
fileStoreClient, executor));
+    long totalWrittenBytes = waitStreamFinish(streamWrite(paths, 
fileStoreClient, routingTable, executor));
 
     long endTime = System.currentTimeMillis();
 
@@ -139,7 +141,8 @@ public class DataStream extends Client {
   }
 
   private Map<String, 
CompletableFuture<List<CompletableFuture<DataStreamReply>>>> streamWrite(
-      List<String> paths, FileStoreClient fileStoreClient, ExecutorService 
executor) {
+      List<String> paths, FileStoreClient fileStoreClient, RoutingTable 
routingTable,
+      ExecutorService executor) {
     Map<String, CompletableFuture<List<CompletableFuture<DataStreamReply>>>> 
fileMap = new HashMap<>();
 
     for(String path : paths) {
@@ -154,7 +157,7 @@ public class DataStream extends Client {
             .orElseThrow(IllegalStateException::new);
         final TransferType writer = type.getConstructor().apply(path, this);
         try {
-          future.complete(writer.transfer(fileStoreClient));
+          future.complete(writer.transfer(fileStoreClient, routingTable));
         } catch (IOException e) {
           future.completeExceptionally(e);
         }
@@ -225,13 +228,14 @@ public class DataStream extends Client {
       return false;
     }
 
-    List<CompletableFuture<DataStreamReply>> transfer(FileStoreClient client) 
throws IOException {
+    List<CompletableFuture<DataStreamReply>> transfer(
+        FileStoreClient client, RoutingTable routingTable) throws IOException {
       if (fileSize <= 0) {
         return Collections.emptyList();
       }
 
       final List<CompletableFuture<DataStreamReply>> futures = new 
ArrayList<>();
-      final DataStreamOutput out = client.getStreamOutput(file.getName(), 
fileSize);
+      final DataStreamOutput out = client.getStreamOutput(file.getName(), 
fileSize, routingTable);
       try (FileInputStream fis = new FileInputStream(file)) {
         final FileChannel in = fis.getChannel();
         for (long offset = 0L; offset < fileSize; ) {
diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
 
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
index 59a83c5..bd10d28 100644
--- 
a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
+++ 
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
@@ -23,6 +23,7 @@ import org.apache.ratis.conf.ConfUtils;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RoutingTable;
 import org.apache.ratis.server.impl.MiniRaftCluster;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.LogUtils;
@@ -72,9 +73,10 @@ public abstract class FileStoreStreamingBaseTest <CLUSTER 
extends MiniRaftCluste
     final CheckedSupplier<FileStoreClient, IOException> newClient =
         () -> new FileStoreClient(cluster.getGroup(), getProperties(), 
primary);
 
-    testSingleFile("foo", SizeInBytes.valueOf("2M"), 10_000, newClient);
-    testSingleFile("bar", SizeInBytes.valueOf("2M"), 1000, newClient);
-    testSingleFile("sar", SizeInBytes.valueOf("20M"), 100_000, newClient);
+    RoutingTable routingTable = getRoutingTable(peers, primary);
+    testSingleFile("foo", SizeInBytes.valueOf("2M"), 10_000, newClient, 
routingTable);
+    testSingleFile("bar", SizeInBytes.valueOf("2M"), 1000, newClient, 
routingTable);
+    testSingleFile("sar", SizeInBytes.valueOf("20M"), 100_000, newClient, 
routingTable);
 
     cluster.shutdown();
   }
@@ -93,15 +95,16 @@ public abstract class FileStoreStreamingBaseTest <CLUSTER 
extends MiniRaftCluste
     final CheckedSupplier<FileStoreClient, IOException> newClient =
         () -> new FileStoreClient(cluster.getGroup(), getProperties(), 
primary);
 
-    testMultipleFiles("foo", 5, SizeInBytes.valueOf("2M"), 10_000, newClient);
-    testMultipleFiles("bar", 10, SizeInBytes.valueOf("2M"), 1000, newClient);
+    RoutingTable routingTable = getRoutingTable(peers, primary);
+    testMultipleFiles("foo", 5, SizeInBytes.valueOf("2M"), 10_000, newClient, 
routingTable);
+    testMultipleFiles("bar", 10, SizeInBytes.valueOf("2M"), 1000, newClient, 
routingTable);
 
     cluster.shutdown();
   }
 
   private void testSingleFile(
-      String path, SizeInBytes fileLength, int bufferSize, 
CheckedSupplier<FileStoreClient,
-      IOException> newClient)
+      String path, SizeInBytes fileLength, int bufferSize, 
CheckedSupplier<FileStoreClient, IOException> newClient,
+      RoutingTable routingTable)
       throws Exception {
     LOG.info("runTestSingleFile with path={}, fileLength={}", path, 
fileLength);
     FileStoreWriter.newBuilder()
@@ -109,11 +112,12 @@ public abstract class FileStoreStreamingBaseTest <CLUSTER 
extends MiniRaftCluste
         .setFileSize(fileLength)
         .setBufferSize(bufferSize)
         .setFileStoreClientSupplier(newClient)
-        .build().streamWriteAndVerify();
+        .build().streamWriteAndVerify(routingTable);
   }
 
   private void testMultipleFiles(String pathBase, int numFile, SizeInBytes 
fileLength,
-      int bufferSize, CheckedSupplier<FileStoreClient, IOException> newClient) 
throws Exception {
+      int bufferSize, CheckedSupplier<FileStoreClient, IOException> newClient,
+      RoutingTable routingTable) throws Exception {
     final ExecutorService executor = Executors.newFixedThreadPool(numFile);
 
     final List<Future<FileStoreWriter>> writerFutures = new ArrayList<>();
@@ -125,7 +129,7 @@ public abstract class FileStoreStreamingBaseTest <CLUSTER 
extends MiniRaftCluste
               .setFileSize(fileLength)
               .setBufferSize(bufferSize)
               .setFileStoreClientSupplier(newClient)
-              .build().streamWriteAndVerify(),
+              .build().streamWriteAndVerify(routingTable),
           () -> path);
       writerFutures.add(executor.submit(callable));
     }
diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
 
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
index 530a618..ac869d4 100644
--- 
a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
+++ 
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
@@ -22,6 +22,7 @@ import org.apache.ratis.datastream.DataStreamTestUtils;
 import org.apache.ratis.io.StandardWriteOption;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RoutingTable;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.thirdparty.io.netty.util.internal.ThreadLocalRandom;
 import org.apache.ratis.util.Preconditions;
@@ -134,9 +135,9 @@ class FileStoreWriter implements Closeable {
     return this;
   }
 
-  public FileStoreWriter streamWriteAndVerify() {
+  public FileStoreWriter streamWriteAndVerify(RoutingTable routingTable) {
     final int size = fileSize.getSizeInt();
-    final DataStreamOutput dataStreamOutput = client.getStreamOutput(fileName, 
size);
+    final DataStreamOutput dataStreamOutput = client.getStreamOutput(fileName, 
size, routingTable);
     final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
     final List<Integer> sizes = new ArrayList<>();
 

Reply via email to