This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a9ae9a  RATIS-1173. Rename StateMachineDataChannel to DataChannel. 
(#295)
0a9ae9a is described below

commit 0a9ae9ae7176a0ec7d5749868005225a0644b1ca
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Nov 24 14:29:46 2020 +0800

    RATIS-1173. Rename StateMachineDataChannel to DataChannel. (#295)
---
 .../apache/ratis/protocol/ClientInvocationId.java  |  8 +++
 .../ratis/netty/server/DataStreamManagement.java   |  6 +-
 .../apache/ratis/statemachine/StateMachine.java    |  4 +-
 .../datastream/DataStreamAsyncClusterTests.java    | 16 +++++-
 .../ratis/datastream/DataStreamBaseTest.java       | 12 ++--
 .../ratis/datastream/DataStreamClusterTests.java   | 38 ++-----------
 .../ratis/datastream/DataStreamTestUtils.java      | 64 +++++++++++++++++-----
 7 files changed, 88 insertions(+), 60 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/ClientInvocationId.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/ClientInvocationId.java
index 679b85c..24e491e 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/ClientInvocationId.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/ClientInvocationId.java
@@ -20,6 +20,7 @@ package org.apache.ratis.protocol;
 import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
 
 import java.util.Objects;
+import java.util.Optional;
 
 /**
  * The id of a client invocation.
@@ -57,6 +58,13 @@ public final class ClientInvocationId {
     return longId;
   }
 
+  public boolean match(StateMachineLogEntryProto proto) {
+    return longId == proto.getCallId() && Optional.ofNullable(clientId)
+        .map(RaftId::toByteString)
+        .filter(b -> b.equals(proto.getClientId()))
+        .isPresent();
+  }
+
   @Override
   public boolean equals(Object obj) {
     if (this == obj) {
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index a78bd1f..14d4196 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -36,7 +36,7 @@ import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServer.Division;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.statemachine.StateMachine.DataStream;
-import org.apache.ratis.statemachine.StateMachine.StateMachineDataChannel;
+import org.apache.ratis.statemachine.StateMachine.DataChannel;
 import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
@@ -251,7 +251,7 @@ public class DataStreamManagement {
   }
 
   static long writeTo(ByteBuf buf, boolean sync, DataStream stream) {
-    final StateMachineDataChannel channel = stream.getWritableByteChannel();
+    final DataChannel channel = stream.getDataChannel();
     long byteWritten = 0;
     for (ByteBuffer buffer : buf.nioBuffers()) {
       try {
@@ -273,7 +273,7 @@ public class DataStreamManagement {
 
   static long close(DataStream stream) {
     try {
-      stream.getWritableByteChannel().close();
+      stream.getDataChannel().close();
       return 0L;
     } catch (IOException e) {
       throw new CompletionException("Failed to close " + stream, e);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 32c5380..95c6c41 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -238,7 +238,7 @@ public interface StateMachine extends Closeable {
   /**
    * For write state machine data.
    */
-  interface StateMachineDataChannel extends WritableByteChannel {
+  interface DataChannel extends WritableByteChannel {
     void force(boolean metadata) throws IOException;
   }
 
@@ -247,7 +247,7 @@ public interface StateMachine extends Closeable {
    */
   interface DataStream {
     /** @return a channel for streaming state machine data. */
-    StateMachineDataChannel getWritableByteChannel();
+    DataChannel getDataChannel();
 
     /**
      * Clean up asynchronously this stream.
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
index 3c98523..ae5ac90 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
@@ -27,9 +27,11 @@ import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.util.CollectionUtils;
+import org.apache.ratis.util.TimeDuration;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -40,6 +42,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 public abstract class DataStreamAsyncClusterTests<CLUSTER extends 
MiniRaftCluster>
     extends DataStreamClusterTests<CLUSTER> {
@@ -52,7 +55,17 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER 
extends MiniRaftCluste
 
   @Test
   public void testMultipleStreamsMultipleServers() throws Exception {
+    // Avoid changing leader
+    final TimeDuration min = 
RaftServerConfigKeys.Rpc.timeoutMin(getProperties());
+    RaftServerConfigKeys.Rpc.setTimeoutMin(getProperties(), 
TimeDuration.valueOf(2, TimeUnit.SECONDS));
+    final TimeDuration max = 
RaftServerConfigKeys.Rpc.timeoutMax(getProperties());
+    RaftServerConfigKeys.Rpc.setTimeoutMax(getProperties(), 
TimeDuration.valueOf(3, TimeUnit.SECONDS));
+
     runWithNewCluster(3, this::runTestDataStream);
+
+    // Reset
+    RaftServerConfigKeys.Rpc.setTimeoutMin(getProperties(), min);
+    RaftServerConfigKeys.Rpc.setTimeoutMax(getProperties(), max);
   }
 
   void runTestDataStream(CLUSTER cluster) throws Exception {
@@ -75,7 +88,8 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER 
extends MiniRaftCluste
       final RaftServerImpl impl = proxy.getImpl(cluster.getGroupId());
       final MultiDataStreamStateMachine stateMachine = 
(MultiDataStreamStateMachine) impl.getStateMachine();
       for (SingleDataStream s : stateMachine.getStreams()) {
-        Assert.assertNotNull(s.getLogEntry());
+        Assert.assertFalse(s.getDataChannel().isOpen());
+        DataStreamTestUtils.assertLogEntry(impl, s);
       }
     }
   }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 0f8d314..46eeed1 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -23,7 +23,7 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.datastream.DataStreamTestUtils.DataChannel;
+import org.apache.ratis.datastream.DataStreamTestUtils.MyDataChannel;
 import 
org.apache.ratis.datastream.DataStreamTestUtils.MultiDataStreamStateMachine;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
@@ -55,7 +55,7 @@ import org.apache.ratis.server.impl.DataStreamServerImpl;
 import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.impl.ServerFactory;
 import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.statemachine.StateMachine.StateMachineDataChannel;
+import org.apache.ratis.statemachine.StateMachine.DataChannel;
 import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.NetUtils;
@@ -211,13 +211,13 @@ abstract class DataStreamBaseTest extends BaseTest {
         final MyDivision d = getDivision(request.getRaftGroupId());
         return d.getDataStreamMap()
             .remove(ClientInvocationId.valueOf(request))
-            .thenApply(StateMachine.DataStream::getWritableByteChannel)
+            .thenApply(StateMachine.DataStream::getDataChannel)
             .thenApply(channel -> buildRaftClientReply(request, channel));
       }
 
-      static RaftClientReply buildRaftClientReply(RaftClientRequest request, 
StateMachineDataChannel channel) {
-        Assert.assertTrue(channel instanceof DataChannel);
-        final DataChannel dataChannel = (DataChannel) channel;
+      static RaftClientReply buildRaftClientReply(RaftClientRequest request, 
DataChannel channel) {
+        Assert.assertTrue(channel instanceof MyDataChannel);
+        final MyDataChannel dataChannel = (MyDataChannel) channel;
         return RaftClientReply.newBuilder()
             .setRequest(request)
             .setSuccess()
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
index 28dbc87..2e84546 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
@@ -21,23 +21,18 @@ import org.apache.ratis.BaseTest;
 import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
-import org.apache.ratis.conf.RaftProperties;
 import 
org.apache.ratis.datastream.DataStreamTestUtils.MultiDataStreamStateMachine;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
-import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.ClientInvocationId;
+import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.util.TimeDuration;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Collection;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.ratis.RaftTestUtil.waitForLeader;
 
@@ -45,10 +40,6 @@ public abstract class DataStreamClusterTests<CLUSTER extends 
MiniRaftCluster> ex
     implements MiniRaftCluster.Factory.Get<CLUSTER> {
   {
     setStateMachine(MultiDataStreamStateMachine.class);
-
-    // Avoid changing leader
-    RaftServerConfigKeys.Rpc.setTimeoutMin(getProperties(), 
TimeDuration.valueOf(2, TimeUnit.SECONDS));
-    RaftServerConfigKeys.Rpc.setTimeoutMax(getProperties(), 
TimeDuration.valueOf(3, TimeUnit.SECONDS));
   }
 
   public static final int NUM_SERVERS = 3;
@@ -66,38 +57,19 @@ public abstract class DataStreamClusterTests<CLUSTER 
extends MiniRaftCluster> ex
     Assert.assertEquals(NUM_SERVERS, peers.size());
     RaftPeer raftPeer = peers.iterator().next();
 
-    final ClientId clientId;
-    final long callId;
+    final RaftClientRequest request;
     try (RaftClient client = cluster.createClient(raftPeer)) {
-      clientId = client.getId();
-
       // send a stream request
       try(final DataStreamOutputImpl out = (DataStreamOutputImpl) 
client.getDataStreamApi().stream()) {
         DataStreamTestUtils.writeAndAssertReplies(out, 1000, 10);
-        callId = out.getHeader().getCallId();
+        request = out.getHeader();
       }
     }
 
     // verify the write request is in the Raft log.
     final RaftLog log = leader.getState().getLog();
-    final LogEntryProto entry = findLogEntry(clientId, callId, log);
+    final LogEntryProto entry = 
DataStreamTestUtils.searchLogEntry(ClientInvocationId.valueOf(request), log);
     LOG.info("entry={}", entry);
     Assert.assertNotNull(entry);
   }
-
-  static LogEntryProto findLogEntry(ClientId clientId, long callId, RaftLog 
log) throws Exception {
-    for (TermIndex termIndex : log.getEntries(0, Long.MAX_VALUE)) {
-      final LogEntryProto entry = log.get(termIndex.getIndex());
-      if (entry.hasStateMachineLogEntry()) {
-        final StateMachineLogEntryProto stateMachineEntry = 
entry.getStateMachineLogEntry();
-        if (stateMachineEntry.getCallId() == callId) {
-          if (clientId.toByteString().equals(stateMachineEntry.getClientId())) 
{
-            return entry;
-          }
-        }
-      }
-    }
-    return null;
-  }
-
 }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index 2f695a0..177d1ff 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -24,6 +24,7 @@ import 
org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
 import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
 import org.apache.ratis.protocol.ClientInvocationId;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.Message;
@@ -33,9 +34,12 @@ import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.statemachine.StateMachine.DataChannel;
 import org.apache.ratis.statemachine.StateMachine.DataStream;
-import org.apache.ratis.statemachine.StateMachine.StateMachineDataChannel;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -66,6 +70,18 @@ public interface DataStreamTestUtils {
     return (byte) ('A' + pos % MODULUS);
   }
 
+  static ByteBuffer initBuffer(int offset, int size) {
+    final ByteBuffer buffer = ByteBuffer.allocateDirect(size);
+    final int length = buffer.capacity();
+    buffer.position(0).limit(length);
+    for (int j = 0; j < length; j++) {
+      buffer.put(pos2byte(offset + j));
+    }
+    buffer.flip();
+    Assert.assertEquals(length, buffer.remaining());
+    return buffer;
+  }
+
   static ByteString bytesWritten2ByteString(long bytesWritten) {
     return ByteString.copyFromUtf8("bytesWritten=" + bytesWritten);
   }
@@ -95,7 +111,7 @@ public interface DataStreamTestUtils {
       final LogEntryProto entry = Objects.requireNonNull(trx.getLogEntry());
       updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
       final SingleDataStream s = 
getSingleDataStream(ClientInvocationId.valueOf(entry.getStateMachineLogEntry()));
-      final ByteString bytesWritten = 
bytesWritten2ByteString(s.getWritableByteChannel().getBytesWritten());
+      final ByteString bytesWritten = 
bytesWritten2ByteString(s.getDataChannel().getBytesWritten());
       return CompletableFuture.completedFuture(() -> bytesWritten);
     }
 
@@ -114,7 +130,7 @@ public interface DataStreamTestUtils {
 
   class SingleDataStream implements DataStream {
     private final RaftClientRequest writeRequest;
-    private final DataChannel channel = new DataChannel();
+    private final MyDataChannel channel = new MyDataChannel();
     private volatile LogEntryProto logEntry;
 
     SingleDataStream(RaftClientRequest request) {
@@ -122,7 +138,7 @@ public interface DataStreamTestUtils {
     }
 
     @Override
-    public DataChannel getWritableByteChannel() {
+    public MyDataChannel getDataChannel() {
       return channel;
     }
 
@@ -155,7 +171,7 @@ public interface DataStreamTestUtils {
     }
   }
 
-  class DataChannel implements StateMachineDataChannel {
+  class MyDataChannel implements DataChannel {
     private volatile boolean open = true;
     private int bytesWritten = 0;
     private int forcedPosition = 0;
@@ -253,7 +269,7 @@ public interface DataStreamTestUtils {
     // check stream
     final MultiDataStreamStateMachine stateMachine = 
(MultiDataStreamStateMachine) 
server.getDivision(header.getRaftGroupId()).getStateMachine();
     final SingleDataStream stream = stateMachine.getSingleDataStream(header);
-    final DataChannel channel = stream.getWritableByteChannel();
+    final MyDataChannel channel = stream.getDataChannel();
     Assert.assertEquals(dataSize, channel.getBytesWritten());
     Assert.assertEquals(dataSize, channel.getForcedPosition());
 
@@ -296,15 +312,33 @@ public interface DataStreamTestUtils {
     Assert.assertEquals(expected.getCallId(), computed.getCallId());
   }
 
-  static ByteBuffer initBuffer(int offset, int size) {
-    final ByteBuffer buffer = ByteBuffer.allocateDirect(size);
-    final int length = buffer.capacity();
-    buffer.position(0).limit(length);
-    for (int j = 0; j < length; j++) {
-      buffer.put(pos2byte(offset + j));
+  static LogEntryProto searchLogEntry(ClientInvocationId invocationId, RaftLog 
log) throws Exception {
+    for (TermIndex termIndex : log.getEntries(0, Long.MAX_VALUE)) {
+      final LogEntryProto entry = log.get(termIndex.getIndex());
+      if (entry.hasStateMachineLogEntry()) {
+        if (invocationId.match(entry.getStateMachineLogEntry())) {
+          return entry;
+        }
+      }
     }
-    buffer.flip();
-    Assert.assertEquals(length, buffer.remaining());
-    return buffer;
+    return null;
+  }
+
+  static void assertLogEntry(LogEntryProto logEntry, RaftClientRequest 
request) {
+    Assert.assertNotNull(logEntry);
+    Assert.assertTrue(logEntry.hasStateMachineLogEntry());
+    final StateMachineLogEntryProto s = logEntry.getStateMachineLogEntry();
+    Assert.assertEquals(StateMachineLogEntryProto.Type.DATASTREAM, 
s.getType());
+    Assert.assertEquals(request.getCallId(), s.getCallId());
+    Assert.assertEquals(request.getClientId().toByteString(), s.getClientId());
+  }
+
+  static void assertLogEntry(RaftServerImpl impl, SingleDataStream stream) 
throws Exception {
+    final RaftClientRequest request = stream.getWriteRequest();
+    final LogEntryProto entryFromStream = stream.getLogEntry();
+    assertLogEntry(entryFromStream, request);
+
+    final LogEntryProto entryFromLog = 
searchLogEntry(ClientInvocationId.valueOf(request), impl.getState().getLog());
+    Assert.assertSame(entryFromStream, entryFromLog);
   }
 }

Reply via email to