This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 0a9ae9a RATIS-1173. Rename StateMachineDataChannel to DataChannel.
(#295)
0a9ae9a is described below
commit 0a9ae9ae7176a0ec7d5749868005225a0644b1ca
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Nov 24 14:29:46 2020 +0800
RATIS-1173. Rename StateMachineDataChannel to DataChannel. (#295)
---
.../apache/ratis/protocol/ClientInvocationId.java | 8 +++
.../ratis/netty/server/DataStreamManagement.java | 6 +-
.../apache/ratis/statemachine/StateMachine.java | 4 +-
.../datastream/DataStreamAsyncClusterTests.java | 16 +++++-
.../ratis/datastream/DataStreamBaseTest.java | 12 ++--
.../ratis/datastream/DataStreamClusterTests.java | 38 ++-----------
.../ratis/datastream/DataStreamTestUtils.java | 64 +++++++++++++++++-----
7 files changed, 88 insertions(+), 60 deletions(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/ClientInvocationId.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/ClientInvocationId.java
index 679b85c..24e491e 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/ClientInvocationId.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/ClientInvocationId.java
@@ -20,6 +20,7 @@ package org.apache.ratis.protocol;
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
import java.util.Objects;
+import java.util.Optional;
/**
* The id of a client invocation.
@@ -57,6 +58,13 @@ public final class ClientInvocationId {
return longId;
}
+ public boolean match(StateMachineLogEntryProto proto) {
+ return longId == proto.getCallId() && Optional.ofNullable(clientId)
+ .map(RaftId::toByteString)
+ .filter(b -> b.equals(proto.getClientId()))
+ .isPresent();
+ }
+
@Override
public boolean equals(Object obj) {
if (this == obj) {
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index a78bd1f..14d4196 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -36,7 +36,7 @@ import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServer.Division;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.statemachine.StateMachine.DataStream;
-import org.apache.ratis.statemachine.StateMachine.StateMachineDataChannel;
+import org.apache.ratis.statemachine.StateMachine.DataChannel;
import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
@@ -251,7 +251,7 @@ public class DataStreamManagement {
}
static long writeTo(ByteBuf buf, boolean sync, DataStream stream) {
- final StateMachineDataChannel channel = stream.getWritableByteChannel();
+ final DataChannel channel = stream.getDataChannel();
long byteWritten = 0;
for (ByteBuffer buffer : buf.nioBuffers()) {
try {
@@ -273,7 +273,7 @@ public class DataStreamManagement {
static long close(DataStream stream) {
try {
- stream.getWritableByteChannel().close();
+ stream.getDataChannel().close();
return 0L;
} catch (IOException e) {
throw new CompletionException("Failed to close " + stream, e);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 32c5380..95c6c41 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -238,7 +238,7 @@ public interface StateMachine extends Closeable {
/**
* For write state machine data.
*/
- interface StateMachineDataChannel extends WritableByteChannel {
+ interface DataChannel extends WritableByteChannel {
void force(boolean metadata) throws IOException;
}
@@ -247,7 +247,7 @@ public interface StateMachine extends Closeable {
*/
interface DataStream {
/** @return a channel for streaming state machine data. */
- StateMachineDataChannel getWritableByteChannel();
+ DataChannel getDataChannel();
/**
* Clean up asynchronously this stream.
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
index 3c98523..ae5ac90 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
@@ -27,9 +27,11 @@ import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.util.CollectionUtils;
+import org.apache.ratis.util.TimeDuration;
import org.junit.Assert;
import org.junit.Test;
@@ -40,6 +42,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
public abstract class DataStreamAsyncClusterTests<CLUSTER extends
MiniRaftCluster>
extends DataStreamClusterTests<CLUSTER> {
@@ -52,7 +55,17 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER
extends MiniRaftCluste
@Test
public void testMultipleStreamsMultipleServers() throws Exception {
+ // Avoid changing leader
+ final TimeDuration min =
RaftServerConfigKeys.Rpc.timeoutMin(getProperties());
+ RaftServerConfigKeys.Rpc.setTimeoutMin(getProperties(),
TimeDuration.valueOf(2, TimeUnit.SECONDS));
+ final TimeDuration max =
RaftServerConfigKeys.Rpc.timeoutMax(getProperties());
+ RaftServerConfigKeys.Rpc.setTimeoutMax(getProperties(),
TimeDuration.valueOf(3, TimeUnit.SECONDS));
+
runWithNewCluster(3, this::runTestDataStream);
+
+ // Reset
+ RaftServerConfigKeys.Rpc.setTimeoutMin(getProperties(), min);
+ RaftServerConfigKeys.Rpc.setTimeoutMax(getProperties(), max);
}
void runTestDataStream(CLUSTER cluster) throws Exception {
@@ -75,7 +88,8 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER
extends MiniRaftCluste
final RaftServerImpl impl = proxy.getImpl(cluster.getGroupId());
final MultiDataStreamStateMachine stateMachine =
(MultiDataStreamStateMachine) impl.getStateMachine();
for (SingleDataStream s : stateMachine.getStreams()) {
- Assert.assertNotNull(s.getLogEntry());
+ Assert.assertFalse(s.getDataChannel().isOpen());
+ DataStreamTestUtils.assertLogEntry(impl, s);
}
}
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 0f8d314..46eeed1 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -23,7 +23,7 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.datastream.DataStreamTestUtils.DataChannel;
+import org.apache.ratis.datastream.DataStreamTestUtils.MyDataChannel;
import
org.apache.ratis.datastream.DataStreamTestUtils.MultiDataStreamStateMachine;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
@@ -55,7 +55,7 @@ import org.apache.ratis.server.impl.DataStreamServerImpl;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.ServerFactory;
import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.statemachine.StateMachine.StateMachineDataChannel;
+import org.apache.ratis.statemachine.StateMachine.DataChannel;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.NetUtils;
@@ -211,13 +211,13 @@ abstract class DataStreamBaseTest extends BaseTest {
final MyDivision d = getDivision(request.getRaftGroupId());
return d.getDataStreamMap()
.remove(ClientInvocationId.valueOf(request))
- .thenApply(StateMachine.DataStream::getWritableByteChannel)
+ .thenApply(StateMachine.DataStream::getDataChannel)
.thenApply(channel -> buildRaftClientReply(request, channel));
}
- static RaftClientReply buildRaftClientReply(RaftClientRequest request,
StateMachineDataChannel channel) {
- Assert.assertTrue(channel instanceof DataChannel);
- final DataChannel dataChannel = (DataChannel) channel;
+ static RaftClientReply buildRaftClientReply(RaftClientRequest request,
DataChannel channel) {
+ Assert.assertTrue(channel instanceof MyDataChannel);
+ final MyDataChannel dataChannel = (MyDataChannel) channel;
return RaftClientReply.newBuilder()
.setRequest(request)
.setSuccess()
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
index 28dbc87..2e84546 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
@@ -21,23 +21,18 @@ import org.apache.ratis.BaseTest;
import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
-import org.apache.ratis.conf.RaftProperties;
import
org.apache.ratis.datastream.DataStreamTestUtils.MultiDataStreamStateMachine;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
-import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.ClientInvocationId;
+import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.util.TimeDuration;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collection;
-import java.util.concurrent.TimeUnit;
import static org.apache.ratis.RaftTestUtil.waitForLeader;
@@ -45,10 +40,6 @@ public abstract class DataStreamClusterTests<CLUSTER extends
MiniRaftCluster> ex
implements MiniRaftCluster.Factory.Get<CLUSTER> {
{
setStateMachine(MultiDataStreamStateMachine.class);
-
- // Avoid changing leader
- RaftServerConfigKeys.Rpc.setTimeoutMin(getProperties(),
TimeDuration.valueOf(2, TimeUnit.SECONDS));
- RaftServerConfigKeys.Rpc.setTimeoutMax(getProperties(),
TimeDuration.valueOf(3, TimeUnit.SECONDS));
}
public static final int NUM_SERVERS = 3;
@@ -66,38 +57,19 @@ public abstract class DataStreamClusterTests<CLUSTER
extends MiniRaftCluster> ex
Assert.assertEquals(NUM_SERVERS, peers.size());
RaftPeer raftPeer = peers.iterator().next();
- final ClientId clientId;
- final long callId;
+ final RaftClientRequest request;
try (RaftClient client = cluster.createClient(raftPeer)) {
- clientId = client.getId();
-
// send a stream request
try(final DataStreamOutputImpl out = (DataStreamOutputImpl)
client.getDataStreamApi().stream()) {
DataStreamTestUtils.writeAndAssertReplies(out, 1000, 10);
- callId = out.getHeader().getCallId();
+ request = out.getHeader();
}
}
// verify the write request is in the Raft log.
final RaftLog log = leader.getState().getLog();
- final LogEntryProto entry = findLogEntry(clientId, callId, log);
+ final LogEntryProto entry =
DataStreamTestUtils.searchLogEntry(ClientInvocationId.valueOf(request), log);
LOG.info("entry={}", entry);
Assert.assertNotNull(entry);
}
-
- static LogEntryProto findLogEntry(ClientId clientId, long callId, RaftLog
log) throws Exception {
- for (TermIndex termIndex : log.getEntries(0, Long.MAX_VALUE)) {
- final LogEntryProto entry = log.get(termIndex.getIndex());
- if (entry.hasStateMachineLogEntry()) {
- final StateMachineLogEntryProto stateMachineEntry =
entry.getStateMachineLogEntry();
- if (stateMachineEntry.getCallId() == callId) {
- if (clientId.toByteString().equals(stateMachineEntry.getClientId()))
{
- return entry;
- }
- }
- }
- }
- return null;
- }
-
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index 2f695a0..177d1ff 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -24,6 +24,7 @@ import
org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.Message;
@@ -33,9 +34,12 @@ import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.statemachine.StateMachine.DataChannel;
import org.apache.ratis.statemachine.StateMachine.DataStream;
-import org.apache.ratis.statemachine.StateMachine.StateMachineDataChannel;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -66,6 +70,18 @@ public interface DataStreamTestUtils {
return (byte) ('A' + pos % MODULUS);
}
+ static ByteBuffer initBuffer(int offset, int size) {
+ final ByteBuffer buffer = ByteBuffer.allocateDirect(size);
+ final int length = buffer.capacity();
+ buffer.position(0).limit(length);
+ for (int j = 0; j < length; j++) {
+ buffer.put(pos2byte(offset + j));
+ }
+ buffer.flip();
+ Assert.assertEquals(length, buffer.remaining());
+ return buffer;
+ }
+
static ByteString bytesWritten2ByteString(long bytesWritten) {
return ByteString.copyFromUtf8("bytesWritten=" + bytesWritten);
}
@@ -95,7 +111,7 @@ public interface DataStreamTestUtils {
final LogEntryProto entry = Objects.requireNonNull(trx.getLogEntry());
updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
final SingleDataStream s =
getSingleDataStream(ClientInvocationId.valueOf(entry.getStateMachineLogEntry()));
- final ByteString bytesWritten =
bytesWritten2ByteString(s.getWritableByteChannel().getBytesWritten());
+ final ByteString bytesWritten =
bytesWritten2ByteString(s.getDataChannel().getBytesWritten());
return CompletableFuture.completedFuture(() -> bytesWritten);
}
@@ -114,7 +130,7 @@ public interface DataStreamTestUtils {
class SingleDataStream implements DataStream {
private final RaftClientRequest writeRequest;
- private final DataChannel channel = new DataChannel();
+ private final MyDataChannel channel = new MyDataChannel();
private volatile LogEntryProto logEntry;
SingleDataStream(RaftClientRequest request) {
@@ -122,7 +138,7 @@ public interface DataStreamTestUtils {
}
@Override
- public DataChannel getWritableByteChannel() {
+ public MyDataChannel getDataChannel() {
return channel;
}
@@ -155,7 +171,7 @@ public interface DataStreamTestUtils {
}
}
- class DataChannel implements StateMachineDataChannel {
+ class MyDataChannel implements DataChannel {
private volatile boolean open = true;
private int bytesWritten = 0;
private int forcedPosition = 0;
@@ -253,7 +269,7 @@ public interface DataStreamTestUtils {
// check stream
final MultiDataStreamStateMachine stateMachine =
(MultiDataStreamStateMachine)
server.getDivision(header.getRaftGroupId()).getStateMachine();
final SingleDataStream stream = stateMachine.getSingleDataStream(header);
- final DataChannel channel = stream.getWritableByteChannel();
+ final MyDataChannel channel = stream.getDataChannel();
Assert.assertEquals(dataSize, channel.getBytesWritten());
Assert.assertEquals(dataSize, channel.getForcedPosition());
@@ -296,15 +312,33 @@ public interface DataStreamTestUtils {
Assert.assertEquals(expected.getCallId(), computed.getCallId());
}
- static ByteBuffer initBuffer(int offset, int size) {
- final ByteBuffer buffer = ByteBuffer.allocateDirect(size);
- final int length = buffer.capacity();
- buffer.position(0).limit(length);
- for (int j = 0; j < length; j++) {
- buffer.put(pos2byte(offset + j));
+ static LogEntryProto searchLogEntry(ClientInvocationId invocationId, RaftLog
log) throws Exception {
+ for (TermIndex termIndex : log.getEntries(0, Long.MAX_VALUE)) {
+ final LogEntryProto entry = log.get(termIndex.getIndex());
+ if (entry.hasStateMachineLogEntry()) {
+ if (invocationId.match(entry.getStateMachineLogEntry())) {
+ return entry;
+ }
+ }
}
- buffer.flip();
- Assert.assertEquals(length, buffer.remaining());
- return buffer;
+ return null;
+ }
+
+ static void assertLogEntry(LogEntryProto logEntry, RaftClientRequest
request) {
+ Assert.assertNotNull(logEntry);
+ Assert.assertTrue(logEntry.hasStateMachineLogEntry());
+ final StateMachineLogEntryProto s = logEntry.getStateMachineLogEntry();
+ Assert.assertEquals(StateMachineLogEntryProto.Type.DATASTREAM,
s.getType());
+ Assert.assertEquals(request.getCallId(), s.getCallId());
+ Assert.assertEquals(request.getClientId().toByteString(), s.getClientId());
+ }
+
+ static void assertLogEntry(RaftServerImpl impl, SingleDataStream stream)
throws Exception {
+ final RaftClientRequest request = stream.getWriteRequest();
+ final LogEntryProto entryFromStream = stream.getLogEntry();
+ assertLogEntry(entryFromStream, request);
+
+ final LogEntryProto entryFromLog =
searchLogEntry(ClientInvocationId.valueOf(request), impl.getState().getLog());
+ Assert.assertSame(entryFromStream, entryFromLog);
}
}