This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new b07ee72  RATIS-1509. Remove GrpcOutputStream and the related conf 
GrpcConfigKeys.OutputStream.* (#592)
b07ee72 is described below

commit b07ee7219d75b0ae57463fd703eef494b1eb1beb
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Jan 27 20:08:20 2022 +0800

    RATIS-1509. Remove GrpcOutputStream and the related conf 
GrpcConfigKeys.OutputStream.* (#592)
---
 .../ratis/examples/arithmetic/cli/Server.java      |   1 -
 .../ratis/examples/filestore/cli/Server.java       |   1 -
 .../java/org/apache/ratis/grpc/GrpcConfigKeys.java |  51 ---
 .../ratis/grpc/client/GrpcClientStreamer.java      | 398 ---------------------
 .../apache/ratis/grpc/client/GrpcOutputStream.java | 116 ------
 .../org/apache/ratis/OutputStreamBaseTest.java     |   1 -
 .../apache/ratis/grpc/TestGrpcOutputStream.java    |  50 ---
 .../ratis/grpc/TestRaftOutputStreamWithGrpc.java   |   2 +-
 8 files changed, 1 insertion(+), 619 deletions(-)

diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Server.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Server.java
index 8836b6a..a939974 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Server.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Server.java
@@ -66,7 +66,6 @@ public class Server extends SubCommandBase {
     Optional.ofNullable(getPeer(peerId).getAdminAddress()).ifPresent(address ->
         GrpcConfigKeys.Admin.setPort(properties, 
NetUtils.createSocketAddr(address).getPort()));
 
-    properties.setInt(GrpcConfigKeys.OutputStream.RETRY_TIMES_KEY, 
Integer.MAX_VALUE);
     RaftServerConfigKeys.setStorageDir(properties, 
Collections.singletonList(storageDir));
     StateMachine stateMachine = new ArithmeticStateMachine();
 
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
index 973d30b..f6af368 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
@@ -97,7 +97,6 @@ public class Server extends SubCommandBase {
       NettyConfigKeys.DataStream.setPort(properties, dataStreamport);
       RaftConfigKeys.DataStream.setType(properties, 
SupportedDataStreamType.NETTY);
     }
-    properties.setInt(GrpcConfigKeys.OutputStream.RETRY_TIMES_KEY, 
Integer.MAX_VALUE);
     RaftServerConfigKeys.setStorageDir(properties, storageDir);
     RaftServerConfigKeys.Write.setElementLimit(properties, 40960);
     RaftServerConfigKeys.Write.setByteLimit(properties, 
SizeInBytes.valueOf("1000MB"));
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index a9dddbb..4e95c1a 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -20,11 +20,9 @@ package org.apache.ratis.grpc;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.util.SizeInBytes;
-import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 import static org.apache.ratis.conf.ConfUtils.*;
@@ -198,55 +196,6 @@ public interface GrpcConfigKeys {
     }
   }
 
-  interface OutputStream {
-    Logger LOG = LoggerFactory.getLogger(OutputStream.class);
-    static Consumer<String> getDefaultLog() {
-      return LOG::debug;
-    }
-
-    String PREFIX = GrpcConfigKeys.PREFIX + ".outputstream";
-
-    String BUFFER_SIZE_KEY = PREFIX + ".buffer.size";
-    SizeInBytes BUFFER_SIZE_DEFAULT = SizeInBytes.valueOf("64KB");
-    static SizeInBytes bufferSize(RaftProperties properties) {
-      return getSizeInBytes(properties::getSizeInBytes,
-          BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT, getDefaultLog());
-    }
-    static void setBufferSize(RaftProperties properties, SizeInBytes 
bufferSize) {
-      setSizeInBytes(properties::set, BUFFER_SIZE_KEY, bufferSize);
-    }
-
-    String RETRY_TIMES_KEY = PREFIX + ".retry.times";
-    int RETRY_TIMES_DEFAULT = 5;
-    static int retryTimes(RaftProperties properties) {
-      return getInt(properties::getInt,
-          RETRY_TIMES_KEY, RETRY_TIMES_DEFAULT, getDefaultLog(), 
requireMin(1));
-    }
-    static void setRetryTimes(RaftProperties properties, int retryTimes) {
-      setInt(properties::setInt, RETRY_TIMES_KEY, retryTimes);
-    }
-
-    String RETRY_INTERVAL_KEY = PREFIX + ".retry.interval";
-    TimeDuration RETRY_INTERVAL_DEFAULT = TimeDuration.valueOf(300, 
TimeUnit.MILLISECONDS);
-    static TimeDuration retryInterval(RaftProperties properties) {
-      return 
getTimeDuration(properties.getTimeDuration(RETRY_INTERVAL_DEFAULT.getUnit()),
-          RETRY_INTERVAL_KEY, RETRY_INTERVAL_DEFAULT, getDefaultLog());
-    }
-    static void setRetryInterval(RaftProperties properties, TimeDuration 
retryInterval) {
-      setTimeDuration(properties::setTimeDuration, RETRY_INTERVAL_KEY, 
retryInterval);
-    }
-
-    String OUTSTANDING_APPENDS_MAX_KEY = PREFIX + ".outstanding.appends.max";
-    int OUTSTANDING_APPENDS_MAX_DEFAULT = 128;
-    static int outstandingAppendsMax(RaftProperties properties) {
-      return getInt(properties::getInt,
-          OUTSTANDING_APPENDS_MAX_KEY, OUTSTANDING_APPENDS_MAX_DEFAULT, 
getDefaultLog(), requireMin(0));
-    }
-    static void setOutstandingAppendsMax(RaftProperties properties, int 
maxOutstandingAppends) {
-      setInt(properties::setInt, OUTSTANDING_APPENDS_MAX_KEY, 
maxOutstandingAppends);
-    }
-  }
-
   String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max";
   SizeInBytes MESSAGE_SIZE_MAX_DEFAULT = SizeInBytes.valueOf("64MB");
   static SizeInBytes messageSizeMax(RaftProperties properties, 
Consumer<String> logger) {
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
deleted file mode 100644
index fcdff58..0000000
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
+++ /dev/null
@@ -1,398 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.client;
-
-import org.apache.ratis.client.impl.ClientProtoUtils;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.GrpcConfigKeys;
-import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.grpc.GrpcUtil;
-import org.apache.ratis.protocol.*;
-import org.apache.ratis.protocol.exceptions.NotLeaderException;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
-import org.apache.ratis.util.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-public class GrpcClientStreamer implements Closeable {
-  public static final Logger LOG = 
LoggerFactory.getLogger(GrpcClientStreamer.class);
-
-  enum RunningState {RUNNING, LOOK_FOR_LEADER, CLOSED, ERROR}
-
-  private static class ExceptionAndRetry {
-    private final Map<RaftPeerId, IOException> exceptionMap = new HashMap<>();
-    private final AtomicInteger retryTimes = new AtomicInteger(0);
-    private final int maxRetryTimes;
-    private final TimeDuration retryInterval;
-
-    ExceptionAndRetry(RaftProperties prop) {
-      maxRetryTimes = GrpcConfigKeys.OutputStream.retryTimes(prop);
-      retryInterval = GrpcConfigKeys.OutputStream.retryInterval(prop);
-    }
-
-    void addException(RaftPeerId peer, IOException e) {
-      exceptionMap.put(peer, e);
-      retryTimes.incrementAndGet();
-    }
-
-    IOException getCombinedException() {
-      return new IOException("Exceptions: " + exceptionMap);
-    }
-
-    boolean shouldRetry() {
-      return retryTimes.get() <= maxRetryTimes;
-    }
-  }
-
-  private final Deque<RaftClientRequestProto> dataQueue;
-  private final Deque<RaftClientRequestProto> ackQueue;
-  private final int maxPendingNum;
-  private final SizeInBytes maxMessageSize;
-
-  private final PeerProxyMap<GrpcClientProtocolProxy> proxyMap;
-  private final Map<RaftPeerId, RaftPeer> peers;
-  private RaftPeerId leaderId;
-  private volatile GrpcClientProtocolProxy leaderProxy;
-  private final ClientId clientId;
-  private final String name;
-
-  private volatile RunningState running = RunningState.RUNNING;
-  private final ExceptionAndRetry exceptionAndRetry;
-  private final Sender senderThread;
-  private final RaftGroupId groupId;
-
-  GrpcClientStreamer(RaftProperties prop, RaftGroup group,
-      RaftPeerId leaderId, ClientId clientId, GrpcTlsConfig tlsConfig) {
-    this.clientId = clientId;
-    this.name = JavaUtils.getClassSimpleName(getClass()) + "-" + clientId;
-    maxPendingNum = GrpcConfigKeys.OutputStream.outstandingAppendsMax(prop);
-    maxMessageSize = GrpcConfigKeys.messageSizeMax(prop, LOG::debug);
-    dataQueue = new ConcurrentLinkedDeque<>();
-    ackQueue = new ConcurrentLinkedDeque<>();
-    exceptionAndRetry = new ExceptionAndRetry(prop);
-
-    this.groupId = group.getGroupId();
-    this.peers = group.getPeers().stream().collect(
-        Collectors.toMap(RaftPeer::getId, Function.identity()));
-    proxyMap = new PeerProxyMap<>(clientId.toString(),
-        raftPeer -> new GrpcClientProtocolProxy(clientId, raftPeer,
-            ResponseHandler::new, prop, tlsConfig));
-    proxyMap.addRaftPeers(group.getPeers());
-    refreshLeaderProxy(leaderId, null);
-
-    senderThread = new Sender();
-    senderThread.setName(this.toString() + "-sender");
-    senderThread.start();
-  }
-
-  private synchronized void refreshLeaderProxy(RaftPeerId suggested,
-      RaftPeerId oldLeader) {
-    if (suggested != null) {
-      leaderId = suggested;
-    } else {
-      if (oldLeader == null) {
-        leaderId = peers.keySet().iterator().next();
-      } else {
-        leaderId = CollectionUtils.random(oldLeader, peers.keySet());
-        if (leaderId == null) {
-          leaderId = oldLeader;
-        }
-      }
-    }
-    LOG.debug("{} switches leader from {} to {}. suggested leader: {}", this,
-          oldLeader, leaderId, suggested);
-    if (leaderProxy != null) {
-      leaderProxy.closeCurrentSession();
-    }
-    try {
-      leaderProxy = proxyMap.getProxy(leaderId);
-    } catch (IOException e) {
-      LOG.error("Should not hit IOException here", e);
-      refreshLeader(null, leaderId);
-    }
-  }
-
-  private boolean isRunning() {
-    return running == RunningState.RUNNING ||
-        running == RunningState.LOOK_FOR_LEADER;
-  }
-
-  private void checkState() throws IOException {
-    if (!isRunning()) {
-      throwException("The GrpcClientStreamer has been closed");
-    }
-  }
-
-  synchronized void write(ByteString content, long seqNum)
-      throws IOException {
-    checkState();
-    while (isRunning() && dataQueue.size() >= maxPendingNum) {
-      try {
-        wait();
-      } catch (InterruptedException ignored) {
-        Thread.currentThread().interrupt();
-      }
-    }
-    if (isRunning()) {
-      // wrap the current buffer into a RaftClientRequestProto
-      final RaftClientRequestProto request = 
ClientProtoUtils.toRaftClientRequestProto(
-          clientId, leaderId, groupId, seqNum, seqNum, content);
-      if (request.getSerializedSize() > maxMessageSize.getSizeInt()) {
-        throw new IOException("msg size:" + request.getSerializedSize() +
-            " exceeds maximum:" + maxMessageSize.getSizeInt());
-      }
-      dataQueue.offer(request);
-      this.notifyAll();
-    } else {
-      throwException(this + " got closed.");
-    }
-  }
-
-  synchronized void flush() throws IOException {
-    checkState();
-    if (dataQueue.isEmpty() && ackQueue.isEmpty()) {
-      return;
-    }
-    // wait for the pending Q to become empty
-    while (isRunning() && (!dataQueue.isEmpty() || !ackQueue.isEmpty())) {
-      try {
-        wait();
-      } catch (InterruptedException ignored) {
-        Thread.currentThread().interrupt();
-      }
-    }
-    if (!isRunning() && (!dataQueue.isEmpty() || !ackQueue.isEmpty())) {
-      throwException(this + " got closed before finishing flush");
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (!isRunning()) {
-      return;
-    }
-    flush();
-
-    running = RunningState.CLOSED;
-    senderThread.interrupt();
-    try {
-      senderThread.join();
-    } catch (InterruptedException ignored) {
-      Thread.currentThread().interrupt();
-    }
-    proxyMap.close();
-  }
-
-  @Override
-  public String toString() {
-    return name;
-  }
-
-  private class Sender extends Daemon {
-    @Override
-    public void run() {
-      while (isRunning()) {
-
-        synchronized (GrpcClientStreamer.this) {
-          while (isRunning() && shouldWait()) {
-            try {
-              GrpcClientStreamer.this.wait();
-            } catch (InterruptedException ignored) {
-              Thread.currentThread().interrupt();
-            }
-          }
-          if (running == RunningState.RUNNING) {
-            Preconditions.assertTrue(!dataQueue.isEmpty(), "dataQueue is 
empty");
-            RaftClientRequestProto next = dataQueue.poll();
-            leaderProxy.onNext(next);
-            ackQueue.offer(next);
-          }
-        }
-      }
-    }
-
-    private boolean shouldWait() {
-      // the sender should wait if any of the following is true
-      // 1) there is no data to send
-      // 2) there are too many outstanding pending requests
-      // 3) Error/NotLeaderException just happened, we're still waiting for
-      //    the first response to confirm the new leader
-      return dataQueue.isEmpty() || ackQueue.size() >= maxPendingNum ||
-          running == RunningState.LOOK_FOR_LEADER;
-    }
-  }
-
-  /** the response handler for stream RPC */
-  private class ResponseHandler implements
-      GrpcClientProtocolProxy.CloseableStreamObserver {
-    private final RaftPeerId targetId;
-    // once handled the first NotLeaderException or Error, the handler should
-    // be inactive and should not make any further action.
-    private volatile boolean active = true;
-
-    ResponseHandler(RaftPeer target) {
-      targetId = target.getId();
-    }
-
-    @Override
-    public String toString() {
-      return GrpcClientStreamer.this + "-ResponseHandler-" + targetId;
-    }
-
-    @Override
-    public void onNext(RaftClientReplyProto reply) {
-      if (!active) {
-        return;
-      }
-      synchronized (GrpcClientStreamer.this) {
-        RaftClientRequestProto pending = 
Objects.requireNonNull(ackQueue.peek());
-        if (reply.getRpcReply().getSuccess()) {
-          Preconditions.assertTrue(pending.getRpcRequest().getCallId() == 
reply.getRpcReply().getCallId(),
-              () -> "pending=" + ClientProtoUtils.toString(pending) + " but 
reply=" + ClientProtoUtils.toString(reply));
-          ackQueue.poll();
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("{} received success ack for {}", this, 
ClientProtoUtils.toString(pending));
-          }
-          // we've identified the correct leader
-          if (running == RunningState.LOOK_FOR_LEADER) {
-            running = RunningState.RUNNING;
-          }
-        } else {
-          // this may be a NotLeaderException
-          RaftClientReply r = ClientProtoUtils.toRaftClientReply(reply);
-          final NotLeaderException nle = r.getNotLeaderException();
-          if (nle != null) {
-            LOG.debug("{} received a NotLeaderException from {}", this,
-                r.getServerId());
-            handleNotLeader(nle, targetId);
-          }
-        }
-        GrpcClientStreamer.this.notifyAll();
-      }
-    }
-
-    @Override
-    public void onError(Throwable t) {
-      LOG.warn(this + " onError", t);
-      if (active) {
-        synchronized (GrpcClientStreamer.this) {
-          handleError(t, this);
-          GrpcClientStreamer.this.notifyAll();
-        }
-      }
-    }
-
-    @Override
-    public void onCompleted() {
-      LOG.info("{} onCompleted, pending requests #: {}", this,
-          ackQueue.size());
-    }
-
-    @Override // called by handleError and handleNotLeader
-    public void close() {
-      active = false;
-    }
-  }
-
-  private void throwException(String msg) throws IOException {
-    if (running == RunningState.ERROR) {
-      throw exceptionAndRetry.getCombinedException();
-    } else {
-      throw new IOException(msg);
-    }
-  }
-
-  private void handleNotLeader(NotLeaderException nle,
-      RaftPeerId oldLeader) {
-    Preconditions.assertTrue(Thread.holdsLock(GrpcClientStreamer.this));
-    // handle NotLeaderException: refresh leader and RaftConfiguration
-    refreshPeers(nle.getPeers());
-
-    refreshLeader(nle.getSuggestedLeader().getId(), oldLeader);
-  }
-
-  private synchronized void handleError(Throwable t, ResponseHandler handler) {
-    final IOException e = GrpcUtil.unwrapIOException(t);
-
-    exceptionAndRetry.addException(handler.targetId, e);
-    LOG.debug("{} got error: {}. Total retry times {}, max retry times {}.",
-        handler, e, exceptionAndRetry.retryTimes.get(),
-        exceptionAndRetry.maxRetryTimes);
-
-    leaderProxy.onError();
-    if (exceptionAndRetry.shouldRetry()) {
-      refreshLeader(null, leaderId);
-    } else {
-      running = RunningState.ERROR;
-    }
-  }
-
-  private void refreshLeader(RaftPeerId suggestedLeader, RaftPeerId oldLeader) 
{
-    running = RunningState.LOOK_FOR_LEADER;
-    refreshLeaderProxy(suggestedLeader, oldLeader);
-    reQueuePendingRequests(leaderId);
-
-    final RaftClientRequestProto request = Objects.requireNonNull(
-        dataQueue.poll());
-    ackQueue.offer(request);
-    try {
-      exceptionAndRetry.retryInterval.sleep();
-    } catch (InterruptedException ignored) {
-      Thread.currentThread().interrupt();
-    }
-    leaderProxy.onNext(request);
-  }
-
-  private void reQueuePendingRequests(RaftPeerId newLeader) {
-    if (isRunning()) {
-      // resend all the pending requests
-      while (!ackQueue.isEmpty()) {
-        final RaftClientRequestProto oldRequest = ackQueue.pollLast();
-        final RaftRpcRequestProto.Builder newRpc = 
RaftRpcRequestProto.newBuilder(oldRequest.getRpcRequest())
-            .setReplyId(newLeader.toByteString());
-        final RaftClientRequestProto newRequest = 
RaftClientRequestProto.newBuilder(oldRequest)
-            .setRpcRequest(newRpc).build();
-        dataQueue.offerFirst(newRequest);
-      }
-    }
-  }
-
-  private void refreshPeers(Collection<RaftPeer> newPeers) {
-    if (newPeers != null && newPeers.size() > 0) {
-      // we only add new peers, we do not remove any peer even if it no longer
-      // belongs to the current raft conf
-      newPeers.forEach(peer -> {
-        peers.putIfAbsent(peer.getId(), peer);
-        proxyMap.computeIfAbsent(peer);
-      });
-
-      LOG.debug("refreshed peers: {}", peers);
-    }
-  }
-}
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java
deleted file mode 100644
index af1b1f1..0000000
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.client;
-
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.GrpcConfigKeys;
-import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.ProtoUtils;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class GrpcOutputStream extends OutputStream {
-  /** internal buffer */
-  private final byte[] buf;
-  private int count;
-  private final AtomicLong seqNum = new AtomicLong();
-  private final ClientId clientId;
-  private final String name;
-  private final GrpcClientStreamer streamer;
-
-  private boolean closed = false;
-
-  public GrpcOutputStream(RaftProperties prop, ClientId clientId,
-      RaftGroup group, RaftPeerId leaderId, GrpcTlsConfig tlsConfig) {
-    final int bufferSize = 
GrpcConfigKeys.OutputStream.bufferSize(prop).getSizeInt();
-    buf = new byte[bufferSize];
-    count = 0;
-    this.clientId = clientId;
-    this.name = JavaUtils.getClassSimpleName(getClass()) + "-" + clientId;
-    streamer = new GrpcClientStreamer(prop, group, leaderId, clientId, 
tlsConfig);
-  }
-
-  @Override
-  public void write(int b) throws IOException {
-    checkClosed();
-    buf[count++] = (byte)b;
-    flushIfNecessary();
-  }
-
-  private void flushIfNecessary() throws IOException {
-    if(count == buf.length) {
-      flushToStreamer();
-    }
-  }
-
-  @Override
-  public void write(byte[] b, int off, int len) throws IOException {
-    checkClosed();
-    if (off < 0 || len < 0 || off > b.length - len) {
-      throw new ArrayIndexOutOfBoundsException();
-    }
-
-    int total = 0;
-    while (total < len) {
-      int toWrite = Math.min(len - total, buf.length - count);
-      System.arraycopy(b, off + total, buf, count, toWrite);
-      count += toWrite;
-      total += toWrite;
-      flushIfNecessary();
-    }
-  }
-
-  private void flushToStreamer() throws IOException {
-    if (count > 0) {
-      streamer.write(ProtoUtils.toByteString(buf, 0, count),
-          seqNum.getAndIncrement());
-      count = 0;
-    }
-  }
-
-  @Override
-  public void flush() throws IOException {
-    checkClosed();
-    flushToStreamer();
-    streamer.flush();
-  }
-
-  @Override
-  public void close() throws IOException {
-    flushToStreamer();
-    streamer.close(); // streamer will flush
-    this.closed = true;
-  }
-
-  @Override
-  public String toString() {
-    return name;
-  }
-
-  private void checkClosed() throws IOException {
-    if (closed) {
-      throw new IOException(this.toString() + " was closed.");
-    }
-  }
-}
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java 
b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
index 8861e2a..d86170d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
@@ -262,7 +262,6 @@ public abstract class OutputStreamBaseTest<CLUSTER extends 
MiniRaftCluster>
   /**
    * Write while leader is killed
    */
-  @Ignore
   @Test
   public void testKillLeader() throws Exception {
     runWithNewCluster(NUM_SERVERS, this::runTestKillLeader);
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java
deleted file mode 100644
index 87cf758..0000000
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc;
-
-import org.apache.log4j.Level;
-import org.apache.ratis.OutputStreamBaseTest;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.client.GrpcClientStreamer;
-import org.apache.ratis.grpc.client.GrpcOutputStream;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.util.Log4jUtils;
-import org.apache.ratis.util.SizeInBytes;
-
-import java.io.OutputStream;
-
-/**
- * Test {@link GrpcOutputStream}
- */
-public class TestGrpcOutputStream
-    extends OutputStreamBaseTest<MiniRaftClusterWithGrpc>
-    implements MiniRaftClusterWithGrpc.FactoryGet {
-
-  {
-    final RaftProperties p = getProperties();
-    GrpcConfigKeys.Server.setAsyncRequestThreadPoolCached(p, false);
-    GrpcConfigKeys.Server.setAsyncRequestThreadPoolSize(p, 8);
-  }
-
-  @Override
-  public OutputStream newOutputStream(MiniRaftClusterWithGrpc cluster, int 
bufferSize) {
-    final RaftProperties p = getProperties();
-    GrpcConfigKeys.OutputStream.setBufferSize(p, 
SizeInBytes.valueOf(bufferSize));
-    return new GrpcOutputStream(p, ClientId.randomId(), cluster.getGroup(), 
cluster.getLeader().getId(), null);
-  }
-}
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java
 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java
index fc5c91d..fb35d95 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java
@@ -24,6 +24,6 @@ public class TestRaftOutputStreamWithGrpc
     implements MiniRaftClusterWithGrpc.FactoryGet {
   @Override
   public int getGlobalTimeoutSeconds() {
-    return 30;
+    return 100;
   }
 }

Reply via email to