Repository: incubator-ratis Updated Branches: refs/heads/master 1e7a06ef1 -> e2bdc2478
RATIS-101. Use ByteString instead of byte[] in RaftId, ClientId, RaftGroupId and RaftPeerId. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/e2bdc247 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/e2bdc247 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/e2bdc247 Branch: refs/heads/master Commit: e2bdc2478f45832a12582a0f94f32735047f151c Parents: 1e7a06e Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Mon Aug 7 14:08:16 2017 -0700 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Mon Aug 7 14:09:06 2017 -0700 ---------------------------------------------------------------------- .../ratis/client/impl/ClientProtoUtils.java | 101 ++++++++++--------- .../org/apache/ratis/protocol/ClientId.java | 8 +- .../org/apache/ratis/protocol/RaftGroupId.java | 8 +- .../java/org/apache/ratis/protocol/RaftId.java | 43 ++++---- .../org/apache/ratis/protocol/RaftPeerId.java | 23 ++--- .../java/org/apache/ratis/util/ProtoUtils.java | 35 ++++--- .../ratis/grpc/client/AppendStreamer.java | 17 ++-- .../ratis/server/impl/RaftServerImpl.java | 2 +- .../ratis/server/impl/ServerProtoUtils.java | 67 ++++++------ .../java/org/apache/ratis/RaftTestUtil.java | 18 ++-- .../org/apache/ratis/protocol/TestRaftId.java | 48 +++++++++ .../server/simulation/RaftServerReply.java | 7 +- .../server/simulation/RaftServerRequest.java | 7 +- 13 files changed, 218 insertions(+), 166 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java index 146622b..2968884 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java @@ -17,9 +17,9 @@ */ package org.apache.ratis.client.impl; +import org.apache.ratis.protocol.*; import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.proto.RaftProtos.*; -import org.apache.ratis.protocol.*; import org.apache.ratis.util.ProtoUtils; import org.apache.ratis.util.ReflectionUtils; @@ -29,54 +29,68 @@ import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto.Exce import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.STATEMACHINEEXCEPTION; public class ClientProtoUtils { + public static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder( - byte[] requestorId, byte[] replyId, byte[] groupId, long callId, boolean success) { + ByteString requestorId, ByteString replyId, RaftGroupId groupId, + long callId, boolean success) { return RaftRpcReplyProto.newBuilder() - .setRequestorId(ProtoUtils.toByteString(requestorId)) - .setReplyId(ProtoUtils.toByteString(replyId)) - .setRaftGroupId(RaftGroupIdProto.newBuilder().setId(ProtoUtils.toByteString(groupId))) + .setRequestorId(requestorId) + .setReplyId(replyId) + .setRaftGroupId(ProtoUtils.toRaftGroupIdProtoBuilder(groupId)) .setCallId(callId) .setSuccess(success); } public static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder( - byte[] requesterId, byte[] replyId, byte[] raftGroupId, long callId) { + ByteString requesterId, ByteString replyId, RaftGroupId groupId, long callId) { return RaftRpcRequestProto.newBuilder() - .setRequestorId(ProtoUtils.toByteString(requesterId)) - .setReplyId(ProtoUtils.toByteString(replyId)) - .setRaftGroupId(RaftGroupIdProto.newBuilder().setId(ProtoUtils.toByteString(raftGroupId))) + .setRequestorId(requesterId) + .setReplyId(replyId) + .setRaftGroupId(ProtoUtils.toRaftGroupIdProtoBuilder(groupId)) .setCallId(callId); } + public static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder( + ClientId requesterId, RaftPeerId replyId, RaftGroupId groupId, long callId) { + return toRaftRpcRequestProtoBuilder( + requesterId.toByteString(), replyId.toByteString(), groupId, callId); + } + + private static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder( + RaftClientRequest request) { + return toRaftRpcRequestProtoBuilder( + request.getClientId(), + request.getServerId(), + request.getRaftGroupId(), + request.getCallId()); + } + public static RaftClientRequest toRaftClientRequest(RaftClientRequestProto p) { - ClientId clientId = new ClientId( - p.getRpcRequest().getRequestorId().toByteArray()); - RaftGroupId groupId = - new RaftGroupId(p.getRpcRequest().getRaftGroupId().getId().toByteArray()); - RaftPeerId serverId = RaftPeerId.valueOf(p.getRpcRequest().getReplyId()); - return new RaftClientRequest(clientId, serverId, groupId, - p.getRpcRequest().getCallId(), + final RaftRpcRequestProto request = p.getRpcRequest(); + return new RaftClientRequest( + new ClientId(request.getRequestorId()), + RaftPeerId.valueOf(request.getReplyId()), + ProtoUtils.toRaftGroupId(request.getRaftGroupId()), + request.getCallId(), toMessage(p.getMessage()), p.getReadOnly()); } public static RaftClientRequestProto toRaftClientRequestProto( RaftClientRequest request) { return RaftClientRequestProto.newBuilder() - .setRpcRequest(toRaftRpcRequestProtoBuilder(request.getClientId().toBytes(), - request.getServerId().toBytes(), request.getRaftGroupId().toBytes(), - request.getCallId())) - .setMessage(toClientMessageEntryProto(request.getMessage())) + .setRpcRequest(toRaftRpcRequestProtoBuilder(request)) + .setMessage(toClientMessageEntryProtoBuilder(request.getMessage())) .setReadOnly(request.isReadOnly()) .build(); } - public static RaftClientRequestProto genRaftClientRequestProto( + public static RaftClientRequestProto toRaftClientRequestProto( ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, ByteString content, boolean readOnly) { return RaftClientRequestProto.newBuilder() - .setRpcRequest(toRaftRpcRequestProtoBuilder(clientId.toBytes(), - serverId.toBytes(), groupId.toBytes(), callId)) - .setMessage(ClientMessageEntryProto.newBuilder().setContent(content)) + .setRpcRequest(toRaftRpcRequestProtoBuilder( + clientId, serverId, groupId, callId)) + .setMessage(toClientMessageEntryProtoBuilder(content)) .setReadOnly(readOnly) .build(); } @@ -85,11 +99,11 @@ public class ClientProtoUtils { RaftClientReply reply) { final RaftClientReplyProto.Builder b = RaftClientReplyProto.newBuilder(); if (reply != null) { - b.setRpcReply(toRaftRpcReplyProtoBuilder(reply.getClientId().toBytes(), - reply.getServerId().toBytes(), reply.getRaftGroupId().toBytes(), + b.setRpcReply(toRaftRpcReplyProtoBuilder(reply.getClientId().toByteString(), + reply.getServerId().toByteString(), reply.getRaftGroupId(), reply.getCallId(), reply.isSuccess())); if (reply.getMessage() != null) { - b.setMessage(toClientMessageEntryProto(reply.getMessage())); + b.setMessage(toClientMessageEntryProtoBuilder(reply.getMessage())); } if (reply.isNotLeader()) { NotLeaderException nle = reply.getNotLeaderException(); @@ -134,8 +148,8 @@ public class ClientProtoUtils { smeProto.getExceptionClassName(), smeProto.getErrorMsg(), smeProto.getStacktrace()); } - ClientId clientId = new ClientId(rp.getRequestorId().toByteArray()); - RaftGroupId groupId = new RaftGroupId(rp.getRaftGroupId().getId().toByteArray()); + ClientId clientId = new ClientId(rp.getRequestorId()); + final RaftGroupId groupId = ProtoUtils.toRaftGroupId(rp.getRaftGroupId()); return new RaftClientReply(clientId, RaftPeerId.valueOf(rp.getReplyId()), groupId, rp.getCallId(), rp.getSuccess(), toMessage(replyProto.getMessage()), e); @@ -167,9 +181,12 @@ public class ClientProtoUtils { return p::getContent; } - private static ClientMessageEntryProto toClientMessageEntryProto(Message message) { - return ClientMessageEntryProto.newBuilder() - .setContent(message.getContent()).build(); + private static ClientMessageEntryProto.Builder toClientMessageEntryProtoBuilder(ByteString message) { + return ClientMessageEntryProto.newBuilder().setContent(message); + } + + private static ClientMessageEntryProto.Builder toClientMessageEntryProtoBuilder(Message message) { + return toClientMessageEntryProtoBuilder(message.getContent()); } public static SetConfigurationRequest toSetConfigurationRequest( @@ -177,20 +194,16 @@ public class ClientProtoUtils { final RaftRpcRequestProto m = p.getRpcRequest(); final RaftPeer[] peers = ProtoUtils.toRaftPeerArray(p.getPeersList()); return new SetConfigurationRequest( - new ClientId(m.getRequestorId().toByteArray()), + new ClientId(m.getRequestorId()), RaftPeerId.valueOf(m.getReplyId()), - new RaftGroupId(m.getRaftGroupId().getId().toByteArray()), + ProtoUtils.toRaftGroupId(m.getRaftGroupId()), p.getRpcRequest().getCallId(), peers); } public static SetConfigurationRequestProto toSetConfigurationRequestProto( SetConfigurationRequest request) { return SetConfigurationRequestProto.newBuilder() - .setRpcRequest(toRaftRpcRequestProtoBuilder( - request.getClientId().toBytes(), - request.getServerId().toBytes(), - request.getRaftGroupId().toBytes(), - request.getCallId())) + .setRpcRequest(toRaftRpcRequestProtoBuilder(request)) .addAllPeers(ProtoUtils.toRaftPeerProtos( Arrays.asList(request.getPeersInNewConf()))) .build(); @@ -201,20 +214,16 @@ public class ClientProtoUtils { final RaftRpcRequestProto m = p.getRpcRequest(); final RaftPeer[] peers = ProtoUtils.toRaftPeerArray(p.getPeersList()); return new ReinitializeRequest( - new ClientId(m.getRequestorId().toByteArray()), + new ClientId(m.getRequestorId()), RaftPeerId.valueOf(m.getReplyId()), - new RaftGroupId(m.getRaftGroupId().getId().toByteArray()), + ProtoUtils.toRaftGroupId(m.getRaftGroupId()), p.getRpcRequest().getCallId(), peers); } public static ReinitializeRequestProto toReinitializeRequestProto( ReinitializeRequest request) { return ReinitializeRequestProto.newBuilder() - .setRpcRequest(toRaftRpcRequestProtoBuilder( - request.getClientId().toBytes(), - request.getServerId().toBytes(), - request.getRaftGroupId().toBytes(), - request.getCallId())) + .setRpcRequest(toRaftRpcRequestProtoBuilder(request)) .addAllPeers(ProtoUtils.toRaftPeerProtos(request.getPeersInGroup().getPeers())) .build(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java index a058a21..9b42076 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java @@ -17,6 +17,8 @@ */ package org.apache.ratis.protocol; +import org.apache.ratis.shaded.com.google.protobuf.ByteString; + import java.util.UUID; /** @@ -29,7 +31,7 @@ public class ClientId extends RaftId { return new ClientId(uuid); } - public ClientId(byte[] data) { + public ClientId(ByteString data) { super(data); } @@ -38,7 +40,7 @@ public class ClientId extends RaftId { } @Override - String createUuidString() { - return "client-" + super.createUuidString(); + String createUuidString(UUID uuid) { + return "client-" + super.createUuidString(uuid); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java index f75ac8f..a7ea70d 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java @@ -17,6 +17,8 @@ */ package org.apache.ratis.protocol; +import org.apache.ratis.shaded.com.google.protobuf.ByteString; + import java.util.UUID; public class RaftGroupId extends RaftId { @@ -30,12 +32,12 @@ public class RaftGroupId extends RaftId { super(id); } - public RaftGroupId(byte[] data) { + public RaftGroupId(ByteString data) { super(data); } @Override - String createUuidString() { - return "group-" + super.createUuidString(); + String createUuidString(UUID uuid) { + return "group-" + super.createUuidString(uuid); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java index 5556c17..ebf9f75 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.protocol; +import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; @@ -28,46 +29,50 @@ import java.util.function.Supplier; public abstract class RaftId { public static final int BYTE_LENGTH = 16; - static UUID toUuid(byte[] data) { - Objects.requireNonNull(data, "data == null"); - Preconditions.assertTrue(data.length == BYTE_LENGTH, - "data.length = %s != BYTE_LENGTH = %s", data.length, BYTE_LENGTH); - ByteBuffer buffer = ByteBuffer.wrap(data); - return new UUID(buffer.getLong(), buffer.getLong()); + private static void checkLength(int length, String name) { + Preconditions.assertTrue(length == BYTE_LENGTH, + " = %s != BYTE_LENGTH = %s", name, length, BYTE_LENGTH); } - static byte[] toBytes(UUID uuid) { + private static UUID toUuid(ByteString bytes) { + Objects.requireNonNull(bytes, "bytes == null"); + checkLength(bytes.size(), "bytes.size()"); + final ByteBuffer buf = bytes.asReadOnlyByteBuffer(); + return new UUID(buf.getLong(), buf.getLong()); + } + + private static ByteString toByteString(UUID uuid) { Objects.requireNonNull(uuid, "uuid == null"); - ByteBuffer buf = ByteBuffer.wrap(new byte[BYTE_LENGTH]); + final ByteBuffer buf = ByteBuffer.wrap(new byte[BYTE_LENGTH]); buf.putLong(uuid.getMostSignificantBits()); buf.putLong(uuid.getLeastSignificantBits()); - return buf.array(); + return ByteString.copyFrom(buf.array()); } private final UUID uuid; - private final byte[] uuidBytes; + private final Supplier<ByteString> uuidBytes; private final Supplier<String> uuidString; - private RaftId(UUID uuid, byte[] bytes) { + private RaftId(UUID uuid, Supplier<ByteString> uuidBytes) { this.uuid = uuid; - this.uuidBytes = bytes; - this.uuidString = JavaUtils.memoize(this::createUuidString); + this.uuidBytes = uuidBytes; + this.uuidString = JavaUtils.memoize(() -> createUuidString(uuid)); } RaftId(UUID uuid) { - this(uuid, toBytes(uuid)); + this(uuid, JavaUtils.memoize(() -> toByteString(uuid))); } - public RaftId(byte[] uuidBytes) { - this(toUuid(uuidBytes), uuidBytes); + public RaftId(ByteString uuidBytes) { + this(toUuid(uuidBytes), () -> uuidBytes); } - String createUuidString() { + String createUuidString(UUID uuid) { return uuid.toString().toUpperCase(); } - public byte[] toBytes() { - return uuidBytes; + public ByteString toByteString() { + return uuidBytes.get(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java index 4e1a5d8..06ad836 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java @@ -21,7 +21,6 @@ import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.util.Preconditions; import java.nio.charset.StandardCharsets; -import java.util.Arrays; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -34,8 +33,7 @@ public class RaftPeerId { private static final Map<String, RaftPeerId> stringMap = new ConcurrentHashMap<>(); public static RaftPeerId valueOf(ByteString id) { - return byteStringMap.computeIfAbsent(id, - key -> new RaftPeerId(key.toByteArray())); + return byteStringMap.computeIfAbsent(id, RaftPeerId::new); } public static RaftPeerId valueOf(String id) { @@ -49,24 +47,24 @@ public class RaftPeerId { /** UTF-8 string as id */ private final String idString; /** The corresponding bytes of {@link #idString}. */ - private final byte[] id; + private final ByteString id; private RaftPeerId(String id) { this.idString = Objects.requireNonNull(id, "id == null"); Preconditions.assertTrue(!id.isEmpty(), "id is an empty string."); - this.id = id.getBytes(StandardCharsets.UTF_8); + this.id = ByteString.copyFrom(idString, StandardCharsets.UTF_8); } - private RaftPeerId(byte[] id) { + private RaftPeerId(ByteString id) { this.id = Objects.requireNonNull(id, "id == null"); - Preconditions.assertTrue(id.length > 0, "id is an empty array."); - this.idString = new String(id, StandardCharsets.UTF_8); + Preconditions.assertTrue(id.size() > 0, "id is empty."); + this.idString = id.toString(StandardCharsets.UTF_8); } /** - * @return id in byte[]. + * @return id in {@link ByteString}. */ - public byte[] toBytes() { + public ByteString toByteString() { return id; } @@ -78,12 +76,11 @@ public class RaftPeerId { @Override public boolean equals(Object other) { return other == this || - (other instanceof RaftPeerId && - Arrays.equals(id, ((RaftPeerId) other).id)); + (other instanceof RaftPeerId && idString.equals(((RaftPeerId)other).idString)); } @Override public int hashCode() { - return Arrays.hashCode(id); + return idString.hashCode(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java index 7d73251..9a2d530 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java @@ -17,26 +17,20 @@ */ package org.apache.ratis.util; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; - import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.com.google.protobuf.ServiceException; -import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.shaded.proto.RaftProtos.RaftPeerProto; -import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; +import org.apache.ratis.shaded.proto.RaftProtos.*; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; public class ProtoUtils { public static ByteString toByteString(Object obj) { @@ -73,7 +67,7 @@ public class ProtoUtils { public static RaftPeerProto toRaftPeerProto(RaftPeer peer) { RaftPeerProto.Builder builder = RaftPeerProto.newBuilder() - .setId(toByteString(peer.getId().toBytes())); + .setId(peer.getId().toByteString()); if (peer.getAddress() != null) { builder.setAddress(peer.getAddress()); } @@ -109,6 +103,15 @@ public class ProtoUtils { }; } + public static RaftGroupId toRaftGroupId(RaftGroupIdProto proto) { + return new RaftGroupId(proto.getId()); + } + + public static RaftGroupIdProto.Builder toRaftGroupIdProtoBuilder(RaftGroupId id) { + return RaftGroupIdProto.newBuilder().setId(id.toByteString()); + } + + public static boolean isConfigurationLogEntry(LogEntryProto entry) { return entry.getLogEntryBodyCase() == LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; @@ -119,7 +122,7 @@ public class ProtoUtils { ClientId clientId, long callId) { return LogEntryProto.newBuilder().setTerm(term).setIndex(index) .setSmLogEntry(operation) - .setClientId(toByteString(clientId.toBytes())).setCallId(callId) + .setClientId(clientId.toByteString()).setCallId(callId) .build(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java index e7d2cd0..36a588e 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java @@ -17,10 +17,8 @@ */ package org.apache.ratis.grpc.client; +import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.util.CollectionUtils; -import org.apache.ratis.util.Preconditions; -import org.apache.ratis.util.TimeDuration; import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.RaftGrpcUtil; import org.apache.ratis.protocol.*; @@ -28,8 +26,7 @@ import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto; -import org.apache.ratis.util.Daemon; -import org.apache.ratis.util.PeerProxyMap; +import org.apache.ratis.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,8 +38,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; -import static org.apache.ratis.client.impl.ClientProtoUtils.*; - public class AppendStreamer implements Closeable { public static final Logger LOG = LoggerFactory.getLogger(AppendStreamer.class); @@ -158,7 +153,7 @@ public class AppendStreamer implements Closeable { } if (isRunning()) { // wrap the current buffer into a RaftClientRequestProto - final RaftClientRequestProto request = genRaftClientRequestProto( + final RaftClientRequestProto request = ClientProtoUtils.toRaftClientRequestProto( clientId, leaderId, groupId, seqNum, content, false); dataQueue.offer(request); this.notifyAll(); @@ -274,7 +269,7 @@ public class AppendStreamer implements Closeable { } } else { // this may be a NotLeaderException - RaftClientReply r = toRaftClientReply(reply); + RaftClientReply r = ClientProtoUtils.toRaftClientReply(reply); if (r.isNotLeader()) { LOG.debug("{} received a NotLeaderException from {}", this, r.getServerId()); @@ -365,8 +360,8 @@ public class AppendStreamer implements Closeable { RaftClientRequestProto newRequest = RaftClientRequestProto.newBuilder() .setMessage(oldRequest.getMessage()) .setReadOnly(oldRequest.getReadOnly()) - .setRpcRequest(toRaftRpcRequestProtoBuilder(clientId.toBytes(), - newLeader.toBytes(), groupId.toBytes(), r.getCallId())) + .setRpcRequest(ClientProtoUtils.toRaftRpcRequestProtoBuilder( + clientId, newLeader, groupId, r.getCallId())) .build(); dataQueue.offerFirst(newRequest); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 1a73cba..9a5e8bc 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -861,7 +861,7 @@ public class RaftServerImpl implements RaftServerProtocol, private void replyPendingRequest(LogEntryProto logEntry, CompletableFuture<Message> stateMachineFuture) { // update the retry cache - final ClientId clientId = new ClientId(logEntry.getClientId().toByteArray()); + final ClientId clientId = new ClientId(logEntry.getClientId()); final long callId = logEntry.getCallId(); final RaftPeerId serverId = getId(); final RetryCache.CacheEntry cacheEntry = retryCache.getOrCreateEntry( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java index 5b11599..845a6ca 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java @@ -18,7 +18,6 @@ package org.apache.ratis.server.impl; import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID; -import static org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS; import java.util.Arrays; import java.util.List; @@ -28,19 +27,8 @@ import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.protocol.TermIndex; -import org.apache.ratis.shaded.proto.RaftProtos; -import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.FileChunkProto; -import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotResult; -import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.shaded.proto.RaftProtos.RaftConfigurationProto; -import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.TermIndexProto; +import org.apache.ratis.shaded.proto.RaftProtos.*; +import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.*; import org.apache.ratis.util.ProtoUtils; @@ -108,23 +96,32 @@ public class ServerProtoUtils { .build(); } + static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder( + RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, boolean success) { + return ClientProtoUtils.toRaftRpcReplyProtoBuilder( + requestorId.toByteString(), replyId.toByteString(), groupId, DEFAULT_CALLID, success); + } + public static RequestVoteReplyProto toRequestVoteReplyProto( RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, boolean success, long term, boolean shouldShutdown) { - final RequestVoteReplyProto.Builder b = RequestVoteReplyProto.newBuilder(); - b.setServerReply(ClientProtoUtils.toRaftRpcReplyProtoBuilder( - requestorId.toBytes(), replyId.toBytes(), groupId.toBytes(), DEFAULT_CALLID, success)) + return RequestVoteReplyProto.newBuilder() + .setServerReply(toRaftRpcReplyProtoBuilder(requestorId, replyId, groupId, success)) .setTerm(term) - .setShouldShutdown(shouldShutdown); - return b.build(); + .setShouldShutdown(shouldShutdown) + .build(); + } + + static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder( + RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId) { + return ClientProtoUtils.toRaftRpcRequestProtoBuilder( + requestorId.toByteString(), replyId.toByteString(), groupId, DEFAULT_CALLID); } public static RequestVoteRequestProto toRequestVoteRequestProto( RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, long term, TermIndex lastEntry) { - RaftProtos.RaftRpcRequestProto.Builder rpb = ClientProtoUtils - .toRaftRpcRequestProtoBuilder(requestorId.toBytes(), replyId.toBytes(), groupId.toBytes(), DEFAULT_CALLID); final RequestVoteRequestProto.Builder b = RequestVoteRequestProto.newBuilder() - .setServerRequest(rpb) + .setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId, groupId)) .setCandidateTerm(term); if (lastEntry != null) { b.setCandidateLastEntry(toTermIndexProto(lastEntry)); @@ -135,8 +132,8 @@ public class ServerProtoUtils { public static InstallSnapshotReplyProto toInstallSnapshotReplyProto( RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, long term, int requestIndex, InstallSnapshotResult result) { - final RaftRpcReplyProto.Builder rb = ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId.toBytes(), - replyId.toBytes(), groupId.toBytes(), DEFAULT_CALLID, result == InstallSnapshotResult.SUCCESS); + final RaftRpcReplyProto.Builder rb = toRaftRpcReplyProtoBuilder(requestorId, + replyId, groupId, result == InstallSnapshotResult.SUCCESS); final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto .newBuilder().setServerReply(rb).setTerm(term).setResult(result) .setRequestIndex(requestIndex); @@ -148,9 +145,7 @@ public class ServerProtoUtils { long term, TermIndex lastTermIndex, List<FileChunkProto> chunks, long totalSize, boolean done) { return InstallSnapshotRequestProto.newBuilder() - .setServerRequest( - ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId.toBytes(), - replyId.toBytes(), groupId.toBytes(), DEFAULT_CALLID)) + .setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId, groupId)) .setRequestId(requestId) .setRequestIndex(requestIndex) // .setRaftConfiguration() TODO: save and pass RaftConfiguration @@ -163,13 +158,13 @@ public class ServerProtoUtils { public static AppendEntriesReplyProto toAppendEntriesReplyProto( RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, long term, - long nextIndex, AppendEntriesReplyProto.AppendResult appendResult) { - RaftRpcReplyProto.Builder rb = ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId.toBytes(), - replyId.toBytes(), groupId.toBytes(), DEFAULT_CALLID, appendResult == SUCCESS); - final AppendEntriesReplyProto.Builder b = AppendEntriesReplyProto.newBuilder(); - b.setServerReply(rb).setTerm(term).setNextIndex(nextIndex) - .setResult(appendResult); - return b.build(); + long nextIndex, AppendResult result) { + return AppendEntriesReplyProto.newBuilder() + .setServerReply(toRaftRpcReplyProtoBuilder( + requestorId, replyId, groupId, result == AppendResult.SUCCESS)) + .setTerm(term) + .setNextIndex(nextIndex) + .setResult(result).build(); } public static AppendEntriesRequestProto toAppendEntriesRequestProto( @@ -178,9 +173,7 @@ public class ServerProtoUtils { TermIndex previous) { final AppendEntriesRequestProto.Builder b = AppendEntriesRequestProto .newBuilder() - .setServerRequest( - ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId.toBytes(), - replyId.toBytes(), groupId.toBytes(), DEFAULT_CALLID)) + .setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId, groupId)) .setLeaderTerm(leaderTerm) .setLeaderCommit(leaderCommit) .setInitializing(initializing); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index ec39073..f785a30 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -39,8 +39,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collection; import java.util.Objects; @@ -48,8 +47,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.function.BooleanSupplier; import java.util.function.IntSupplier; -import static org.apache.ratis.util.ProtoUtils.toByteString; - public class RaftTestUtil { public static final LogEntryProto[] EMPTY_LOGENTRY_ARRAY = new LogEntryProto[0]; static final Logger LOG = LoggerFactory.getLogger(RaftTestUtil.class); @@ -168,6 +165,10 @@ public class RaftTestUtil { } } + public static ByteString toByteString(String string) { + return ByteString.copyFrom(string, StandardCharsets.UTF_8); + } + public static class SimpleMessage implements Message { public static SimpleMessage[] create(int numMessages) { return create(numMessages, "m"); @@ -211,7 +212,7 @@ public class RaftTestUtil { @Override public ByteString getContent() { - return toByteString(messageId.getBytes(Charset.forName("UTF-8"))); + return toByteString(messageId); } } @@ -240,12 +241,7 @@ public class RaftTestUtil { } public SMLogEntryProto getLogEntryContent() { - try { - return SMLogEntryProto.newBuilder() - .setData(toByteString(op.getBytes("UTF-8"))).build(); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); - } + return SMLogEntryProto.newBuilder().setData(toByteString(op)).build(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-server/src/test/java/org/apache/ratis/protocol/TestRaftId.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/protocol/TestRaftId.java b/ratis-server/src/test/java/org/apache/ratis/protocol/TestRaftId.java new file mode 100644 index 0000000..b5806d5 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/protocol/TestRaftId.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.junit.Assert; +import org.junit.Test; + +public class TestRaftId { + @Test + public void testClientId() { + final ClientId id = ClientId.createId(); + final ByteString bytes = id.toByteString(); + Assert.assertEquals(bytes, id.toByteString()); + Assert.assertEquals(id, new ClientId(bytes)); + } + + @Test + public void testRaftGroupId() { + final RaftGroupId id = RaftGroupId.createId(); + final ByteString bytes = id.toByteString(); + Assert.assertEquals(bytes, id.toByteString()); + Assert.assertEquals(id, new RaftGroupId(bytes)); + } + + @Test + public void testRaftPeerId() { + final RaftPeerId id = RaftPeerId.valueOf("abc"); + final ByteString bytes = id.toByteString(); + Assert.assertEquals(bytes, id.toByteString()); + Assert.assertEquals(id, RaftPeerId.valueOf(bytes)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java index 10d6272..df0545d 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java @@ -22,6 +22,7 @@ import org.apache.ratis.protocol.RaftRpcMessage; import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto; +import org.apache.ratis.util.ProtoUtils; import java.util.Objects; @@ -102,11 +103,11 @@ public class RaftServerReply implements RaftRpcMessage { @Override public RaftGroupId getRaftGroupId() { if (isAppendEntries()) { - return new RaftGroupId(appendEntries.getServerReply().getRaftGroupId().toByteArray()); + return ProtoUtils.toRaftGroupId(appendEntries.getServerReply().getRaftGroupId()); } else if (isRequestVote()) { - return new RaftGroupId(requestVote.getServerReply().getRaftGroupId().toByteArray()); + return ProtoUtils.toRaftGroupId(requestVote.getServerReply().getRaftGroupId()); } else { - return new RaftGroupId(installSnapshot.getServerReply().getRaftGroupId().toByteArray()); + return ProtoUtils.toRaftGroupId(installSnapshot.getServerReply().getRaftGroupId()); } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java index f9d3d31..e38296c 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java @@ -22,6 +22,7 @@ import org.apache.ratis.protocol.RaftRpcMessage; import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto; +import org.apache.ratis.util.ProtoUtils; class RaftServerRequest implements RaftRpcMessage { private final AppendEntriesRequestProto appendEntries; @@ -100,11 +101,11 @@ class RaftServerRequest implements RaftRpcMessage { @Override public RaftGroupId getRaftGroupId() { if (isAppendEntries()) { - return new RaftGroupId(appendEntries.getServerRequest().getRaftGroupId().getId().toByteArray()); + return ProtoUtils.toRaftGroupId(appendEntries.getServerRequest().getRaftGroupId()); } else if (isRequestVote()) { - return new RaftGroupId(requestVote.getServerRequest().getRaftGroupId().getId().toByteArray()); + return ProtoUtils.toRaftGroupId(requestVote.getServerRequest().getRaftGroupId()); } else { - return new RaftGroupId(installSnapshot.getServerRequest().getRaftGroupId().getId().toByteArray()); + return ProtoUtils.toRaftGroupId(installSnapshot.getServerRequest().getRaftGroupId()); } } }
