This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new b07ee72 RATIS-1509. Remove GrpcOutputStream and the related conf
GrpcConfigKeys.OutputStream.* (#592)
b07ee72 is described below
commit b07ee7219d75b0ae57463fd703eef494b1eb1beb
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Jan 27 20:08:20 2022 +0800
RATIS-1509. Remove GrpcOutputStream and the related conf
GrpcConfigKeys.OutputStream.* (#592)
---
.../ratis/examples/arithmetic/cli/Server.java | 1 -
.../ratis/examples/filestore/cli/Server.java | 1 -
.../java/org/apache/ratis/grpc/GrpcConfigKeys.java | 51 ---
.../ratis/grpc/client/GrpcClientStreamer.java | 398 ---------------------
.../apache/ratis/grpc/client/GrpcOutputStream.java | 116 ------
.../org/apache/ratis/OutputStreamBaseTest.java | 1 -
.../apache/ratis/grpc/TestGrpcOutputStream.java | 50 ---
.../ratis/grpc/TestRaftOutputStreamWithGrpc.java | 2 +-
8 files changed, 1 insertion(+), 619 deletions(-)
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Server.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Server.java
index 8836b6a..a939974 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Server.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Server.java
@@ -66,7 +66,6 @@ public class Server extends SubCommandBase {
Optional.ofNullable(getPeer(peerId).getAdminAddress()).ifPresent(address ->
GrpcConfigKeys.Admin.setPort(properties,
NetUtils.createSocketAddr(address).getPort()));
- properties.setInt(GrpcConfigKeys.OutputStream.RETRY_TIMES_KEY,
Integer.MAX_VALUE);
RaftServerConfigKeys.setStorageDir(properties,
Collections.singletonList(storageDir));
StateMachine stateMachine = new ArithmeticStateMachine();
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
index 973d30b..f6af368 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
@@ -97,7 +97,6 @@ public class Server extends SubCommandBase {
NettyConfigKeys.DataStream.setPort(properties, dataStreamport);
RaftConfigKeys.DataStream.setType(properties,
SupportedDataStreamType.NETTY);
}
- properties.setInt(GrpcConfigKeys.OutputStream.RETRY_TIMES_KEY,
Integer.MAX_VALUE);
RaftServerConfigKeys.setStorageDir(properties, storageDir);
RaftServerConfigKeys.Write.setElementLimit(properties, 40960);
RaftServerConfigKeys.Write.setByteLimit(properties,
SizeInBytes.valueOf("1000MB"));
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index a9dddbb..4e95c1a 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -20,11 +20,9 @@ package org.apache.ratis.grpc;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.util.SizeInBytes;
-import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import static org.apache.ratis.conf.ConfUtils.*;
@@ -198,55 +196,6 @@ public interface GrpcConfigKeys {
}
}
- interface OutputStream {
- Logger LOG = LoggerFactory.getLogger(OutputStream.class);
- static Consumer<String> getDefaultLog() {
- return LOG::debug;
- }
-
- String PREFIX = GrpcConfigKeys.PREFIX + ".outputstream";
-
- String BUFFER_SIZE_KEY = PREFIX + ".buffer.size";
- SizeInBytes BUFFER_SIZE_DEFAULT = SizeInBytes.valueOf("64KB");
- static SizeInBytes bufferSize(RaftProperties properties) {
- return getSizeInBytes(properties::getSizeInBytes,
- BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT, getDefaultLog());
- }
- static void setBufferSize(RaftProperties properties, SizeInBytes
bufferSize) {
- setSizeInBytes(properties::set, BUFFER_SIZE_KEY, bufferSize);
- }
-
- String RETRY_TIMES_KEY = PREFIX + ".retry.times";
- int RETRY_TIMES_DEFAULT = 5;
- static int retryTimes(RaftProperties properties) {
- return getInt(properties::getInt,
- RETRY_TIMES_KEY, RETRY_TIMES_DEFAULT, getDefaultLog(),
requireMin(1));
- }
- static void setRetryTimes(RaftProperties properties, int retryTimes) {
- setInt(properties::setInt, RETRY_TIMES_KEY, retryTimes);
- }
-
- String RETRY_INTERVAL_KEY = PREFIX + ".retry.interval";
- TimeDuration RETRY_INTERVAL_DEFAULT = TimeDuration.valueOf(300,
TimeUnit.MILLISECONDS);
- static TimeDuration retryInterval(RaftProperties properties) {
- return
getTimeDuration(properties.getTimeDuration(RETRY_INTERVAL_DEFAULT.getUnit()),
- RETRY_INTERVAL_KEY, RETRY_INTERVAL_DEFAULT, getDefaultLog());
- }
- static void setRetryInterval(RaftProperties properties, TimeDuration
retryInterval) {
- setTimeDuration(properties::setTimeDuration, RETRY_INTERVAL_KEY,
retryInterval);
- }
-
- String OUTSTANDING_APPENDS_MAX_KEY = PREFIX + ".outstanding.appends.max";
- int OUTSTANDING_APPENDS_MAX_DEFAULT = 128;
- static int outstandingAppendsMax(RaftProperties properties) {
- return getInt(properties::getInt,
- OUTSTANDING_APPENDS_MAX_KEY, OUTSTANDING_APPENDS_MAX_DEFAULT,
getDefaultLog(), requireMin(0));
- }
- static void setOutstandingAppendsMax(RaftProperties properties, int
maxOutstandingAppends) {
- setInt(properties::setInt, OUTSTANDING_APPENDS_MAX_KEY,
maxOutstandingAppends);
- }
- }
-
String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max";
SizeInBytes MESSAGE_SIZE_MAX_DEFAULT = SizeInBytes.valueOf("64MB");
static SizeInBytes messageSizeMax(RaftProperties properties,
Consumer<String> logger) {
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
deleted file mode 100644
index fcdff58..0000000
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
+++ /dev/null
@@ -1,398 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.client;
-
-import org.apache.ratis.client.impl.ClientProtoUtils;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.GrpcConfigKeys;
-import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.grpc.GrpcUtil;
-import org.apache.ratis.protocol.*;
-import org.apache.ratis.protocol.exceptions.NotLeaderException;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
-import org.apache.ratis.util.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-public class GrpcClientStreamer implements Closeable {
- public static final Logger LOG =
LoggerFactory.getLogger(GrpcClientStreamer.class);
-
- enum RunningState {RUNNING, LOOK_FOR_LEADER, CLOSED, ERROR}
-
- private static class ExceptionAndRetry {
- private final Map<RaftPeerId, IOException> exceptionMap = new HashMap<>();
- private final AtomicInteger retryTimes = new AtomicInteger(0);
- private final int maxRetryTimes;
- private final TimeDuration retryInterval;
-
- ExceptionAndRetry(RaftProperties prop) {
- maxRetryTimes = GrpcConfigKeys.OutputStream.retryTimes(prop);
- retryInterval = GrpcConfigKeys.OutputStream.retryInterval(prop);
- }
-
- void addException(RaftPeerId peer, IOException e) {
- exceptionMap.put(peer, e);
- retryTimes.incrementAndGet();
- }
-
- IOException getCombinedException() {
- return new IOException("Exceptions: " + exceptionMap);
- }
-
- boolean shouldRetry() {
- return retryTimes.get() <= maxRetryTimes;
- }
- }
-
- private final Deque<RaftClientRequestProto> dataQueue;
- private final Deque<RaftClientRequestProto> ackQueue;
- private final int maxPendingNum;
- private final SizeInBytes maxMessageSize;
-
- private final PeerProxyMap<GrpcClientProtocolProxy> proxyMap;
- private final Map<RaftPeerId, RaftPeer> peers;
- private RaftPeerId leaderId;
- private volatile GrpcClientProtocolProxy leaderProxy;
- private final ClientId clientId;
- private final String name;
-
- private volatile RunningState running = RunningState.RUNNING;
- private final ExceptionAndRetry exceptionAndRetry;
- private final Sender senderThread;
- private final RaftGroupId groupId;
-
- GrpcClientStreamer(RaftProperties prop, RaftGroup group,
- RaftPeerId leaderId, ClientId clientId, GrpcTlsConfig tlsConfig) {
- this.clientId = clientId;
- this.name = JavaUtils.getClassSimpleName(getClass()) + "-" + clientId;
- maxPendingNum = GrpcConfigKeys.OutputStream.outstandingAppendsMax(prop);
- maxMessageSize = GrpcConfigKeys.messageSizeMax(prop, LOG::debug);
- dataQueue = new ConcurrentLinkedDeque<>();
- ackQueue = new ConcurrentLinkedDeque<>();
- exceptionAndRetry = new ExceptionAndRetry(prop);
-
- this.groupId = group.getGroupId();
- this.peers = group.getPeers().stream().collect(
- Collectors.toMap(RaftPeer::getId, Function.identity()));
- proxyMap = new PeerProxyMap<>(clientId.toString(),
- raftPeer -> new GrpcClientProtocolProxy(clientId, raftPeer,
- ResponseHandler::new, prop, tlsConfig));
- proxyMap.addRaftPeers(group.getPeers());
- refreshLeaderProxy(leaderId, null);
-
- senderThread = new Sender();
- senderThread.setName(this.toString() + "-sender");
- senderThread.start();
- }
-
- private synchronized void refreshLeaderProxy(RaftPeerId suggested,
- RaftPeerId oldLeader) {
- if (suggested != null) {
- leaderId = suggested;
- } else {
- if (oldLeader == null) {
- leaderId = peers.keySet().iterator().next();
- } else {
- leaderId = CollectionUtils.random(oldLeader, peers.keySet());
- if (leaderId == null) {
- leaderId = oldLeader;
- }
- }
- }
- LOG.debug("{} switches leader from {} to {}. suggested leader: {}", this,
- oldLeader, leaderId, suggested);
- if (leaderProxy != null) {
- leaderProxy.closeCurrentSession();
- }
- try {
- leaderProxy = proxyMap.getProxy(leaderId);
- } catch (IOException e) {
- LOG.error("Should not hit IOException here", e);
- refreshLeader(null, leaderId);
- }
- }
-
- private boolean isRunning() {
- return running == RunningState.RUNNING ||
- running == RunningState.LOOK_FOR_LEADER;
- }
-
- private void checkState() throws IOException {
- if (!isRunning()) {
- throwException("The GrpcClientStreamer has been closed");
- }
- }
-
- synchronized void write(ByteString content, long seqNum)
- throws IOException {
- checkState();
- while (isRunning() && dataQueue.size() >= maxPendingNum) {
- try {
- wait();
- } catch (InterruptedException ignored) {
- Thread.currentThread().interrupt();
- }
- }
- if (isRunning()) {
- // wrap the current buffer into a RaftClientRequestProto
- final RaftClientRequestProto request =
ClientProtoUtils.toRaftClientRequestProto(
- clientId, leaderId, groupId, seqNum, seqNum, content);
- if (request.getSerializedSize() > maxMessageSize.getSizeInt()) {
- throw new IOException("msg size:" + request.getSerializedSize() +
- " exceeds maximum:" + maxMessageSize.getSizeInt());
- }
- dataQueue.offer(request);
- this.notifyAll();
- } else {
- throwException(this + " got closed.");
- }
- }
-
- synchronized void flush() throws IOException {
- checkState();
- if (dataQueue.isEmpty() && ackQueue.isEmpty()) {
- return;
- }
- // wait for the pending Q to become empty
- while (isRunning() && (!dataQueue.isEmpty() || !ackQueue.isEmpty())) {
- try {
- wait();
- } catch (InterruptedException ignored) {
- Thread.currentThread().interrupt();
- }
- }
- if (!isRunning() && (!dataQueue.isEmpty() || !ackQueue.isEmpty())) {
- throwException(this + " got closed before finishing flush");
- }
- }
-
- @Override
- public void close() throws IOException {
- if (!isRunning()) {
- return;
- }
- flush();
-
- running = RunningState.CLOSED;
- senderThread.interrupt();
- try {
- senderThread.join();
- } catch (InterruptedException ignored) {
- Thread.currentThread().interrupt();
- }
- proxyMap.close();
- }
-
- @Override
- public String toString() {
- return name;
- }
-
- private class Sender extends Daemon {
- @Override
- public void run() {
- while (isRunning()) {
-
- synchronized (GrpcClientStreamer.this) {
- while (isRunning() && shouldWait()) {
- try {
- GrpcClientStreamer.this.wait();
- } catch (InterruptedException ignored) {
- Thread.currentThread().interrupt();
- }
- }
- if (running == RunningState.RUNNING) {
- Preconditions.assertTrue(!dataQueue.isEmpty(), "dataQueue is
empty");
- RaftClientRequestProto next = dataQueue.poll();
- leaderProxy.onNext(next);
- ackQueue.offer(next);
- }
- }
- }
- }
-
- private boolean shouldWait() {
- // the sender should wait if any of the following is true
- // 1) there is no data to send
- // 2) there are too many outstanding pending requests
- // 3) Error/NotLeaderException just happened, we're still waiting for
- // the first response to confirm the new leader
- return dataQueue.isEmpty() || ackQueue.size() >= maxPendingNum ||
- running == RunningState.LOOK_FOR_LEADER;
- }
- }
-
- /** the response handler for stream RPC */
- private class ResponseHandler implements
- GrpcClientProtocolProxy.CloseableStreamObserver {
- private final RaftPeerId targetId;
- // once handled the first NotLeaderException or Error, the handler should
- // be inactive and should not make any further action.
- private volatile boolean active = true;
-
- ResponseHandler(RaftPeer target) {
- targetId = target.getId();
- }
-
- @Override
- public String toString() {
- return GrpcClientStreamer.this + "-ResponseHandler-" + targetId;
- }
-
- @Override
- public void onNext(RaftClientReplyProto reply) {
- if (!active) {
- return;
- }
- synchronized (GrpcClientStreamer.this) {
- RaftClientRequestProto pending =
Objects.requireNonNull(ackQueue.peek());
- if (reply.getRpcReply().getSuccess()) {
- Preconditions.assertTrue(pending.getRpcRequest().getCallId() ==
reply.getRpcReply().getCallId(),
- () -> "pending=" + ClientProtoUtils.toString(pending) + " but
reply=" + ClientProtoUtils.toString(reply));
- ackQueue.poll();
- if (LOG.isTraceEnabled()) {
- LOG.trace("{} received success ack for {}", this,
ClientProtoUtils.toString(pending));
- }
- // we've identified the correct leader
- if (running == RunningState.LOOK_FOR_LEADER) {
- running = RunningState.RUNNING;
- }
- } else {
- // this may be a NotLeaderException
- RaftClientReply r = ClientProtoUtils.toRaftClientReply(reply);
- final NotLeaderException nle = r.getNotLeaderException();
- if (nle != null) {
- LOG.debug("{} received a NotLeaderException from {}", this,
- r.getServerId());
- handleNotLeader(nle, targetId);
- }
- }
- GrpcClientStreamer.this.notifyAll();
- }
- }
-
- @Override
- public void onError(Throwable t) {
- LOG.warn(this + " onError", t);
- if (active) {
- synchronized (GrpcClientStreamer.this) {
- handleError(t, this);
- GrpcClientStreamer.this.notifyAll();
- }
- }
- }
-
- @Override
- public void onCompleted() {
- LOG.info("{} onCompleted, pending requests #: {}", this,
- ackQueue.size());
- }
-
- @Override // called by handleError and handleNotLeader
- public void close() {
- active = false;
- }
- }
-
- private void throwException(String msg) throws IOException {
- if (running == RunningState.ERROR) {
- throw exceptionAndRetry.getCombinedException();
- } else {
- throw new IOException(msg);
- }
- }
-
- private void handleNotLeader(NotLeaderException nle,
- RaftPeerId oldLeader) {
- Preconditions.assertTrue(Thread.holdsLock(GrpcClientStreamer.this));
- // handle NotLeaderException: refresh leader and RaftConfiguration
- refreshPeers(nle.getPeers());
-
- refreshLeader(nle.getSuggestedLeader().getId(), oldLeader);
- }
-
- private synchronized void handleError(Throwable t, ResponseHandler handler) {
- final IOException e = GrpcUtil.unwrapIOException(t);
-
- exceptionAndRetry.addException(handler.targetId, e);
- LOG.debug("{} got error: {}. Total retry times {}, max retry times {}.",
- handler, e, exceptionAndRetry.retryTimes.get(),
- exceptionAndRetry.maxRetryTimes);
-
- leaderProxy.onError();
- if (exceptionAndRetry.shouldRetry()) {
- refreshLeader(null, leaderId);
- } else {
- running = RunningState.ERROR;
- }
- }
-
- private void refreshLeader(RaftPeerId suggestedLeader, RaftPeerId oldLeader)
{
- running = RunningState.LOOK_FOR_LEADER;
- refreshLeaderProxy(suggestedLeader, oldLeader);
- reQueuePendingRequests(leaderId);
-
- final RaftClientRequestProto request = Objects.requireNonNull(
- dataQueue.poll());
- ackQueue.offer(request);
- try {
- exceptionAndRetry.retryInterval.sleep();
- } catch (InterruptedException ignored) {
- Thread.currentThread().interrupt();
- }
- leaderProxy.onNext(request);
- }
-
- private void reQueuePendingRequests(RaftPeerId newLeader) {
- if (isRunning()) {
- // resend all the pending requests
- while (!ackQueue.isEmpty()) {
- final RaftClientRequestProto oldRequest = ackQueue.pollLast();
- final RaftRpcRequestProto.Builder newRpc =
RaftRpcRequestProto.newBuilder(oldRequest.getRpcRequest())
- .setReplyId(newLeader.toByteString());
- final RaftClientRequestProto newRequest =
RaftClientRequestProto.newBuilder(oldRequest)
- .setRpcRequest(newRpc).build();
- dataQueue.offerFirst(newRequest);
- }
- }
- }
-
- private void refreshPeers(Collection<RaftPeer> newPeers) {
- if (newPeers != null && newPeers.size() > 0) {
- // we only add new peers, we do not remove any peer even if it no longer
- // belongs to the current raft conf
- newPeers.forEach(peer -> {
- peers.putIfAbsent(peer.getId(), peer);
- proxyMap.computeIfAbsent(peer);
- });
-
- LOG.debug("refreshed peers: {}", peers);
- }
- }
-}
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java
deleted file mode 100644
index af1b1f1..0000000
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.client;
-
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.GrpcConfigKeys;
-import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.ProtoUtils;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class GrpcOutputStream extends OutputStream {
- /** internal buffer */
- private final byte[] buf;
- private int count;
- private final AtomicLong seqNum = new AtomicLong();
- private final ClientId clientId;
- private final String name;
- private final GrpcClientStreamer streamer;
-
- private boolean closed = false;
-
- public GrpcOutputStream(RaftProperties prop, ClientId clientId,
- RaftGroup group, RaftPeerId leaderId, GrpcTlsConfig tlsConfig) {
- final int bufferSize =
GrpcConfigKeys.OutputStream.bufferSize(prop).getSizeInt();
- buf = new byte[bufferSize];
- count = 0;
- this.clientId = clientId;
- this.name = JavaUtils.getClassSimpleName(getClass()) + "-" + clientId;
- streamer = new GrpcClientStreamer(prop, group, leaderId, clientId,
tlsConfig);
- }
-
- @Override
- public void write(int b) throws IOException {
- checkClosed();
- buf[count++] = (byte)b;
- flushIfNecessary();
- }
-
- private void flushIfNecessary() throws IOException {
- if(count == buf.length) {
- flushToStreamer();
- }
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- checkClosed();
- if (off < 0 || len < 0 || off > b.length - len) {
- throw new ArrayIndexOutOfBoundsException();
- }
-
- int total = 0;
- while (total < len) {
- int toWrite = Math.min(len - total, buf.length - count);
- System.arraycopy(b, off + total, buf, count, toWrite);
- count += toWrite;
- total += toWrite;
- flushIfNecessary();
- }
- }
-
- private void flushToStreamer() throws IOException {
- if (count > 0) {
- streamer.write(ProtoUtils.toByteString(buf, 0, count),
- seqNum.getAndIncrement());
- count = 0;
- }
- }
-
- @Override
- public void flush() throws IOException {
- checkClosed();
- flushToStreamer();
- streamer.flush();
- }
-
- @Override
- public void close() throws IOException {
- flushToStreamer();
- streamer.close(); // streamer will flush
- this.closed = true;
- }
-
- @Override
- public String toString() {
- return name;
- }
-
- private void checkClosed() throws IOException {
- if (closed) {
- throw new IOException(this.toString() + " was closed.");
- }
- }
-}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
index 8861e2a..d86170d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
@@ -262,7 +262,6 @@ public abstract class OutputStreamBaseTest<CLUSTER extends
MiniRaftCluster>
/**
* Write while leader is killed
*/
- @Ignore
@Test
public void testKillLeader() throws Exception {
runWithNewCluster(NUM_SERVERS, this::runTestKillLeader);
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java
deleted file mode 100644
index 87cf758..0000000
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc;
-
-import org.apache.log4j.Level;
-import org.apache.ratis.OutputStreamBaseTest;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.client.GrpcClientStreamer;
-import org.apache.ratis.grpc.client.GrpcOutputStream;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.util.Log4jUtils;
-import org.apache.ratis.util.SizeInBytes;
-
-import java.io.OutputStream;
-
-/**
- * Test {@link GrpcOutputStream}
- */
-public class TestGrpcOutputStream
- extends OutputStreamBaseTest<MiniRaftClusterWithGrpc>
- implements MiniRaftClusterWithGrpc.FactoryGet {
-
- {
- final RaftProperties p = getProperties();
- GrpcConfigKeys.Server.setAsyncRequestThreadPoolCached(p, false);
- GrpcConfigKeys.Server.setAsyncRequestThreadPoolSize(p, 8);
- }
-
- @Override
- public OutputStream newOutputStream(MiniRaftClusterWithGrpc cluster, int
bufferSize) {
- final RaftProperties p = getProperties();
- GrpcConfigKeys.OutputStream.setBufferSize(p,
SizeInBytes.valueOf(bufferSize));
- return new GrpcOutputStream(p, ClientId.randomId(), cluster.getGroup(),
cluster.getLeader().getId(), null);
- }
-}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java
index fc5c91d..fb35d95 100644
---
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java
@@ -24,6 +24,6 @@ public class TestRaftOutputStreamWithGrpc
implements MiniRaftClusterWithGrpc.FactoryGet {
@Override
public int getGlobalTimeoutSeconds() {
- return 30;
+ return 100;
}
}