HDDS-675. Add blocking buffer and use watchApi for flush/close in OzoneClient. Contributed by Shashikant Banerjee.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/671fd652 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/671fd652 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/671fd652 Branch: refs/heads/trunk Commit: 671fd6524b2640474de2bc3b8dbaa0a3cf7fcf01 Parents: 75291e6 Author: Shashikant Banerjee <[email protected]> Authored: Tue Nov 13 23:39:14 2018 +0530 Committer: Shashikant Banerjee <[email protected]> Committed: Tue Nov 13 23:39:14 2018 +0530 ---------------------------------------------------------------------- .../hadoop/hdds/scm/XceiverClientGrpc.java | 28 +- .../hadoop/hdds/scm/XceiverClientRatis.java | 65 ++- .../hdds/scm/storage/ChunkOutputStream.java | 448 +++++++++++++++---- .../hdds/scm/XceiverClientAsyncReply.java | 98 ++++ .../hadoop/hdds/scm/XceiverClientSpi.java | 12 +- .../scm/storage/ContainerProtocolCalls.java | 57 ++- .../apache/hadoop/ozone/OzoneConfigKeys.java | 24 +- .../common/src/main/resources/ozone-default.xml | 26 +- .../keyvalue/impl/BlockManagerImpl.java | 3 + .../hadoop/ozone/client/OzoneClientUtils.java | 27 -- .../ozone/client/io/ChunkGroupOutputStream.java | 337 +++++++------- .../hadoop/ozone/client/rpc/RpcClient.java | 27 +- .../apache/hadoop/ozone/MiniOzoneCluster.java | 45 +- .../hadoop/ozone/MiniOzoneClusterImpl.java | 19 + .../apache/hadoop/ozone/RatisTestHelper.java | 2 +- .../rpc/TestCloseContainerHandlingByClient.java | 252 +++-------- .../rpc/TestContainerStateMachineFailures.java | 20 +- .../client/rpc/TestFailureHandlingByClient.java | 213 +++++++++ .../ozone/container/ContainerTestHelper.java | 34 ++ .../container/ozoneimpl/TestOzoneContainer.java | 2 +- .../ozone/scm/TestXceiverClientMetrics.java | 3 +- .../ozone/web/TestOzoneRestWithMiniCluster.java | 2 +- .../web/storage/DistributedStorageHandler.java | 42 +- .../hadoop/ozone/freon/TestDataValidate.java | 6 + .../ozone/freon/TestRandomKeyGenerator.java | 6 + 25 files changed, 1248 insertions(+), 550 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index cc34e27..9acd832 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; @@ -47,6 +48,7 @@ import java.util.Map; import java.util.HashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * A Client for the storageContainer protocol. @@ -163,7 +165,7 @@ public class XceiverClientGrpc extends XceiverClientSpi { // In case the command gets retried on a 2nd datanode, // sendCommandAsyncCall will create a new channel and async stub // in case these don't exist for the specific datanode. - responseProto = sendCommandAsync(request, dn).get(); + responseProto = sendCommandAsync(request, dn).getResponse().get(); if (responseProto.getResult() == ContainerProtos.Result.SUCCESS) { break; } @@ -197,13 +199,23 @@ public class XceiverClientGrpc extends XceiverClientSpi { * @throws IOException */ @Override - public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync( + public XceiverClientAsyncReply sendCommandAsync( ContainerCommandRequestProto request) throws IOException, ExecutionException, InterruptedException { - return sendCommandAsync(request, pipeline.getFirstNode()); + XceiverClientAsyncReply asyncReply = + sendCommandAsync(request, pipeline.getFirstNode()); + + // TODO : for now make this API sync in nature as async requests are + // served out of order over XceiverClientGrpc. This needs to be fixed + // if this API is to be used for I/O path. Currently, this is not + // used for Read/Write Operation but for tests. + if (!HddsUtils.isReadOnly(request)) { + asyncReply.getResponse().get(); + } + return asyncReply; } - private CompletableFuture<ContainerCommandResponseProto> sendCommandAsync( + private XceiverClientAsyncReply sendCommandAsync( ContainerCommandRequestProto request, DatanodeDetails dn) throws IOException, ExecutionException, InterruptedException { if (closed) { @@ -257,7 +269,7 @@ public class XceiverClientGrpc extends XceiverClientSpi { }); requestObserver.onNext(request); requestObserver.onCompleted(); - return replyFuture; + return new XceiverClientAsyncReply(replyFuture); } private void reconnect(DatanodeDetails dn) @@ -288,6 +300,12 @@ public class XceiverClientGrpc extends XceiverClientSpi { // For stand alone pipeline, there is no notion called destroy pipeline. } + @Override + public void watchForCommit(long index, long timeout) + throws InterruptedException, ExecutionException, TimeoutException { + // there is no notion of watch for commit index in standalone pipeline + }; + /** * Returns pipeline Type. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index f38fd3b..e4b711a 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -50,9 +50,12 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; /** @@ -192,9 +195,22 @@ public final class XceiverClientRatis extends XceiverClientSpi { getClient().sendAsync(() -> byteString); } - public void watchForCommit(long index, long timeout) throws Exception { - getClient().sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED) - .get(timeout, TimeUnit.MILLISECONDS); + @Override + public void watchForCommit(long index, long timeout) + throws InterruptedException, ExecutionException, TimeoutException { + // TODO: Create a new Raft client instance to watch + CompletableFuture<RaftClientReply> replyFuture = getClient() + .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED); + try { + replyFuture.get(timeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException toe) { + LOG.warn("3 way commit failed ", toe); + getClient() + .sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED) + .get(timeout, TimeUnit.MILLISECONDS); + LOG.info("Could not commit " + index + " to all the nodes." + + "Committed by majority."); + } } /** * Sends a given command to server gets a waitable future back. @@ -204,18 +220,37 @@ public final class XceiverClientRatis extends XceiverClientSpi { * @throws IOException */ @Override - public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync( + public XceiverClientAsyncReply sendCommandAsync( ContainerCommandRequestProto request) { - return sendRequestAsync(request).whenComplete((reply, e) -> - LOG.debug("received reply {} for request: {} exception: {}", request, - reply, e)) - .thenApply(reply -> { - try { - return ContainerCommandResponseProto.parseFrom( - reply.getMessage().getContent()); - } catch (InvalidProtocolBufferException e) { - throw new CompletionException(e); - } - }); + XceiverClientAsyncReply asyncReply = new XceiverClientAsyncReply(null); + CompletableFuture<RaftClientReply> raftClientReply = + sendRequestAsync(request); + Collection<XceiverClientAsyncReply.CommitInfo> commitInfos = + new ArrayList<>(); + CompletableFuture<ContainerCommandResponseProto> containerCommandResponse = + raftClientReply.whenComplete((reply, e) -> LOG + .debug("received reply {} for request: {} exception: {}", request, + reply, e)) + .thenApply(reply -> { + try { + ContainerCommandResponseProto response = + ContainerCommandResponseProto + .parseFrom(reply.getMessage().getContent()); + reply.getCommitInfos().forEach(e -> { + XceiverClientAsyncReply.CommitInfo commitInfo = + new XceiverClientAsyncReply.CommitInfo( + e.getServer().getAddress(), e.getCommitIndex()); + commitInfos.add(commitInfo); + asyncReply.setCommitInfos(commitInfos); + asyncReply.setLogIndex(reply.getLogIndex()); + }); + return response; + } catch (InvalidProtocolBufferException e) { + throw new CompletionException(e); + } + }); + asyncReply.setResponse(containerCommandResponse); + return asyncReply; } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java index 4e881c4..bdc6a83 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java @@ -17,10 +17,10 @@ */ package org.apache.hadoop.hdds.scm.storage; - - import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.hdds.scm.XceiverClientManager; @@ -29,16 +29,24 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; import org.apache.hadoop.hdds.client.BlockID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.UUID; - +import java.util.List; +import java.util.ArrayList; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls - .putBlock; + .putBlockAsync; import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls - .writeChunk; + .writeChunkAsync; /** * An {@link OutputStream} used by the REST service in combination with the @@ -57,6 +65,8 @@ import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls * through to the container. */ public class ChunkOutputStream extends OutputStream { + public static final Logger LOG = + LoggerFactory.getLogger(ChunkOutputStream.class); private BlockID blockID; private final String key; @@ -64,67 +74,97 @@ public class ChunkOutputStream extends OutputStream { private final BlockData.Builder containerBlockData; private XceiverClientManager xceiverClientManager; private XceiverClientSpi xceiverClient; - private ByteBuffer buffer; private final String streamId; private int chunkIndex; private int chunkSize; + private final long streamBufferFlushSize; + private final long streamBufferMaxSize; + private final long watchTimeout; + private ByteBuffer buffer; + // The IOException will be set by response handling thread in case there is an + // exception received in the response. If the exception is set, the next + // request will fail upfront. + private IOException ioException; + private ExecutorService responseExecutor; + + // position of the buffer where the last flush was attempted + private int lastFlushPos; + + // position of the buffer till which the flush was successfully + // acknowledged by all nodes in pipeline + private int lastSuccessfulFlushIndex; + + // list to hold up all putBlock futures + private List<CompletableFuture<ContainerProtos.ContainerCommandResponseProto>> + futureList; + // list maintaining commit indexes for putBlocks + private List<Long> commitIndexList; /** * Creates a new ChunkOutputStream. * - * @param blockID block ID - * @param key chunk key + * @param blockID block ID + * @param key chunk key * @param xceiverClientManager client manager that controls client - * @param xceiverClient client to perform container calls - * @param traceID container protocol call args - * @param chunkSize chunk size + * @param xceiverClient client to perform container calls + * @param traceID container protocol call args + * @param chunkSize chunk size */ public ChunkOutputStream(BlockID blockID, String key, - XceiverClientManager xceiverClientManager, - XceiverClientSpi xceiverClient, String traceID, int chunkSize) { + XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, + String traceID, int chunkSize, long streamBufferFlushSize, + long streamBufferMaxSize, long watchTimeout, ByteBuffer buffer) { this.blockID = blockID; this.key = key; this.traceID = traceID; this.chunkSize = chunkSize; - KeyValue keyValue = KeyValue.newBuilder() - .setKey("TYPE").setValue("KEY").build(); - this.containerBlockData = BlockData.newBuilder() - .setBlockID(blockID.getDatanodeBlockIDProtobuf()) - .addMetadata(keyValue); + KeyValue keyValue = + KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build(); + this.containerBlockData = + BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf()) + .addMetadata(keyValue); this.xceiverClientManager = xceiverClientManager; this.xceiverClient = xceiverClient; - this.buffer = ByteBuffer.allocate(chunkSize); this.streamId = UUID.randomUUID().toString(); this.chunkIndex = 0; - } + this.streamBufferFlushSize = streamBufferFlushSize; + this.streamBufferMaxSize = streamBufferMaxSize; + this.watchTimeout = watchTimeout; + this.buffer = buffer; + this.ioException = null; - public ByteBuffer getBuffer() { - return buffer; + // A single thread executor handle the responses of async requests + responseExecutor = Executors.newSingleThreadExecutor(); + commitIndexList = new ArrayList<>(); + lastSuccessfulFlushIndex = 0; + futureList = new ArrayList<>(); + lastFlushPos = 0; } public BlockID getBlockID() { return blockID; } + public int getLastSuccessfulFlushIndex() { + return lastSuccessfulFlushIndex; + } + + @Override public void write(int b) throws IOException { checkOpen(); - int rollbackPosition = buffer.position(); - int rollbackLimit = buffer.limit(); - buffer.put((byte)b); - if (buffer.position() == chunkSize) { - flushBufferToChunk(rollbackPosition, rollbackLimit); - } + byte[] buf = new byte[1]; + buf[0] = (byte) b; + write(buf, 0, 1); } @Override - public void write(byte[] b, int off, int len) - throws IOException { + public void write(byte[] b, int off, int len) throws IOException { if (b == null) { throw new NullPointerException(); } - if ((off < 0) || (off > b.length) || (len < 0) || - ((off + len) > b.length) || ((off + len) < 0)) { + if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) + || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } if (len == 0) { @@ -132,93 +172,300 @@ public class ChunkOutputStream extends OutputStream { } checkOpen(); while (len > 0) { - int writeLen = Math.min(chunkSize - buffer.position(), len); - int rollbackPosition = buffer.position(); - int rollbackLimit = buffer.limit(); + int writeLen; + writeLen = Math.min(chunkSize - buffer.position() % chunkSize, len); buffer.put(b, off, writeLen); - if (buffer.position() == chunkSize) { - flushBufferToChunk(rollbackPosition, rollbackLimit); + if (buffer.position() % chunkSize == 0) { + int pos = buffer.position() - chunkSize; + int limit = buffer.position(); + writeChunk(pos, limit); } off += writeLen; len -= writeLen; + if (buffer.position() >= streamBufferFlushSize + && buffer.position() % streamBufferFlushSize == 0) { + + lastFlushPos = buffer.position(); + futureList.add(handlePartialFlush()); + } + if (buffer.position() >= streamBufferMaxSize + && buffer.position() % streamBufferMaxSize == 0) { + handleFullBuffer(); + } + } + } + + /** + * Will be called on the retryPath in case closedContainerException/ + * TimeoutException. + * @param len length of data to write + * @throws IOException if error occured + */ + + // In this case, the data is already cached in the buffer. + public void writeOnRetry(int len) throws IOException { + if (len == 0) { + return; + } + int off = 0; + checkOpen(); + while (len > 0) { + int writeLen; + writeLen = Math.min(chunkSize, len); + if (writeLen == chunkSize) { + int pos = off; + int limit = pos + chunkSize; + writeChunk(pos, limit); + } + off += writeLen; + len -= writeLen; + if (off % streamBufferFlushSize == 0) { + lastFlushPos = off; + futureList.add(handlePartialFlush()); + } + if (off % streamBufferMaxSize == 0) { + handleFullBuffer(); + } + } + } + + private void handleResponse( + ContainerProtos.ContainerCommandResponseProto response, + XceiverClientAsyncReply asyncReply) { + validateResponse(response); + discardBuffer(asyncReply); + } + + private void discardBuffer(XceiverClientAsyncReply asyncReply) { + if (!commitIndexList.isEmpty()) { + long index = commitIndexList.get(0); + if (checkIfBufferDiscardRequired(asyncReply, index)) { + updateFlushIndex(); + } + } + } + + /** + * just update the lastSuccessfulFlushIndex. Since we have allocated + * the buffer more than the streamBufferMaxSize, we can keep on writing + * to the buffer. In case of failure, we will read the data starting from + * lastSuccessfulFlushIndex. + */ + private void updateFlushIndex() { + lastSuccessfulFlushIndex += streamBufferFlushSize; + LOG.debug("Discarding buffer till pos " + lastSuccessfulFlushIndex); + if (!commitIndexList.isEmpty()) { + commitIndexList.remove(0); + futureList.remove(0); + } + + } + /** + * Check if the last commitIndex stored at the beginning of the + * commitIndexList is less than equal to current commitInfo indexes. + * If its true, the buffer has been successfully flushed till the + * last position where flush happened. + */ + private boolean checkIfBufferDiscardRequired( + XceiverClientAsyncReply asyncReply, long commitIndex) { + if (asyncReply.getCommitInfos() != null) { + for (XceiverClientAsyncReply.CommitInfo info : asyncReply + .getCommitInfos()) { + if (info.getCommitIndex() < commitIndex) { + return false; + } + } + } + return true; + } + + /** + * This is a blocking call.It will wait for the flush till the commit index + * at the head of the commitIndexList gets replicated to all or majority. + * @throws IOException + */ + private void handleFullBuffer() throws IOException { + if (!commitIndexList.isEmpty()) { + watchForCommit(commitIndexList.get(0)); + } + } + + /** + * calls watchForCommit API of the Ratis Client. For Standalone client, + * it is a no op. + * @param commitIndex log index to watch for + * @throws IOException IOException in case watch gets timed out + */ + private void watchForCommit(long commitIndex) throws IOException { + checkOpen(); + Preconditions.checkState(!commitIndexList.isEmpty()); + try { + xceiverClient.watchForCommit(commitIndex, watchTimeout); + } catch (TimeoutException | InterruptedException | ExecutionException e) { + LOG.warn("watchForCommit failed for index " + commitIndex, e); + throw new IOException( + "Unexpected Storage Container Exception: " + e.toString(), e); + } + } + + private CompletableFuture<ContainerProtos. + ContainerCommandResponseProto> handlePartialFlush() + throws IOException { + String requestId = + traceID + ContainerProtos.Type.PutBlock + chunkIndex + blockID; + try { + XceiverClientAsyncReply asyncReply = + putBlockAsync(xceiverClient, containerBlockData.build(), requestId); + CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future = + asyncReply.getResponse(); + + return future.thenApplyAsync(e -> { + handleResponse(e, asyncReply); + // if the ioException is not set, putBlock is successful + if (ioException == null) { + LOG.debug( + "Adding index " + asyncReply.getLogIndex() + " commitList size " + + commitIndexList.size()); + BlockID responseBlockID = BlockID.getFromProtobuf( + e.getPutBlock().getCommittedBlockLength().getBlockID()); + Preconditions.checkState(blockID.getContainerBlockID() + .equals(responseBlockID.getContainerBlockID())); + // updates the bcsId of the block + blockID = responseBlockID; + long index = asyncReply.getLogIndex(); + // for standalone protocol, logIndex will always be 0. + if (index != 0) { + commitIndexList.add(index); + } else { + updateFlushIndex(); + } + } + return e; + }, responseExecutor); + } catch (IOException | InterruptedException | ExecutionException e) { + throw new IOException( + "Unexpected Storage Container Exception: " + e.toString(), e); } } @Override public void flush() throws IOException { - checkOpen(); - if (buffer.position() > 0) { - int rollbackPosition = buffer.position(); - int rollbackLimit = buffer.limit(); - flushBufferToChunk(rollbackPosition, rollbackLimit); + if (xceiverClientManager != null && xceiverClient != null + && buffer != null) { + checkOpen(); + if (buffer.position() > 0 && lastSuccessfulFlushIndex != buffer + .position()) { + try { + + // flush the last chunk data residing on the buffer + if (buffer.position() % chunkSize > 0) { + int pos = buffer.position() - (buffer.position() % chunkSize); + writeChunk(pos, buffer.position()); + } + if (lastFlushPos != buffer.position()) { + lastFlushPos = buffer.position(); + handlePartialFlush(); + } + CompletableFuture<Void> combinedFuture = CompletableFuture.allOf( + futureList.toArray(new CompletableFuture[futureList.size()])); + combinedFuture.get(); + // just check again if the exception is hit while waiting for the + // futures to ensure flush has indeed succeeded + checkOpen(); + } catch (InterruptedException | ExecutionException e) { + throw new IOException( + "Unexpected Storage Container Exception: " + e.toString(), e); + } + } } } + private void writeChunk(int pos, int limit) throws IOException { + // Please note : We are not flipping the slice when we write since + // the slices are pointing the buffer start and end as needed for + // the chunk write. Also please note, Duplicate does not create a + // copy of data, it only creates metadata that points to the data + // stream. + ByteBuffer chunk = buffer.duplicate(); + chunk.position(pos); + chunk.limit(limit); + writeChunkToContainer(chunk); + } + @Override public void close() throws IOException { if (xceiverClientManager != null && xceiverClient != null && buffer != null) { - if (buffer.position() > 0) { - writeChunkToContainer(); - } try { - ContainerProtos.PutBlockResponseProto responseProto = - putBlock(xceiverClient, containerBlockData.build(), traceID); - BlockID responseBlockID = BlockID.getFromProtobuf( - responseProto.getCommittedBlockLength().getBlockID()); - Preconditions.checkState(blockID.getContainerBlockID() - .equals(responseBlockID.getContainerBlockID())); - // updates the bcsId of the block - blockID = responseBlockID; - } catch (IOException e) { + if (buffer.position() > lastFlushPos) { + int pos = buffer.position() - (buffer.position() % chunkSize); + writeChunk(pos, buffer.position()); + futureList.add(handlePartialFlush()); + } + CompletableFuture<Void> combinedFuture = CompletableFuture.allOf( + futureList.toArray(new CompletableFuture[futureList.size()])); + + // wait for all the transactions to complete + combinedFuture.get(); + + // irrespective of whether the commitIndexList is empty or not, + // ensure there is no exception set(For Standalone Protocol) + checkOpen(); + if (!commitIndexList.isEmpty()) { + // wait for the last commit index in the commitIndexList to get + // committed to all or majority of nodes in case timeout happens. + long lastIndex = commitIndexList.get(commitIndexList.size() - 1); + LOG.debug( + "waiting for last flush Index " + lastIndex + " to catch up"); + watchForCommit(lastIndex); + updateFlushIndex(); + } + } catch (InterruptedException | ExecutionException e) { throw new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); } finally { cleanup(); } } + // clear the buffer + buffer.clear(); + } + + private void validateResponse( + ContainerProtos.ContainerCommandResponseProto responseProto) { + try { + ContainerProtocolCalls.validateContainerResponse(responseProto); + } catch (StorageContainerException sce) { + ioException = new IOException( + "Unexpected Storage Container Exception: " + sce.toString(), sce); + } } public void cleanup() { - xceiverClientManager.releaseClient(xceiverClient); + if (xceiverClientManager != null) { + xceiverClientManager.releaseClient(xceiverClient); + } xceiverClientManager = null; xceiverClient = null; - buffer = null; + if (futureList != null) { + futureList.clear(); + } + futureList = null; + commitIndexList = null; + responseExecutor.shutdown(); } /** - * Checks if the stream is open. If not, throws an exception. + * Checks if the stream is open or exception has occured. + * If not, throws an exception. * * @throws IOException if stream is closed */ private void checkOpen() throws IOException { if (xceiverClient == null) { throw new IOException("ChunkOutputStream has been closed."); - } - } - - /** - * Attempts to flush buffered writes by writing a new chunk to the container. - * If successful, then clears the buffer to prepare to receive writes for a - * new chunk. - * - * @param rollbackPosition position to restore in buffer if write fails - * @param rollbackLimit limit to restore in buffer if write fails - * @throws IOException if there is an I/O error while performing the call - */ - private void flushBufferToChunk(int rollbackPosition, - int rollbackLimit) throws IOException { - boolean success = false; - try { - writeChunkToContainer(); - success = true; - } finally { - if (success) { - buffer.clear(); - } else { - buffer.position(rollbackPosition); - buffer.limit(rollbackLimit); - } + } else if (ioException != null) { + throw ioException; } } @@ -228,23 +475,32 @@ public class ChunkOutputStream extends OutputStream { * * @throws IOException if there is an I/O error while performing the call */ - private void writeChunkToContainer() throws IOException { - buffer.flip(); - ByteString data = ByteString.copyFrom(buffer); - ChunkInfo chunk = ChunkInfo - .newBuilder() - .setChunkName( - DigestUtils.md5Hex(key) + "_stream_" - + streamId + "_chunk_" + ++chunkIndex) - .setOffset(0) - .setLen(data.size()) - .build(); + private void writeChunkToContainer(ByteBuffer chunk) throws IOException { + int effectiveChunkSize = chunk.remaining(); + ByteString data = ByteString.copyFrom(chunk); + ChunkInfo chunkInfo = ChunkInfo.newBuilder().setChunkName( + DigestUtils.md5Hex(key) + "_stream_" + streamId + "_chunk_" + + ++chunkIndex).setOffset(0).setLen(effectiveChunkSize).build(); + // generate a unique requestId + String requestId = + traceID + ContainerProtos.Type.WriteChunk + chunkIndex + chunkInfo + .getChunkName(); try { - writeChunk(xceiverClient, chunk, blockID, data, traceID); - } catch (IOException e) { + XceiverClientAsyncReply asyncReply = + writeChunkAsync(xceiverClient, chunkInfo, blockID, data, requestId); + CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future = + asyncReply.getResponse(); + future.thenApplyAsync(e -> { + handleResponse(e, asyncReply); + return e; + }, responseExecutor); + } catch (IOException | InterruptedException | ExecutionException e) { throw new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); } - containerBlockData.addChunks(chunk); + LOG.debug( + "writing chunk " + chunkInfo.getChunkName() + " blockID " + blockID + + " length " + chunk.remaining()); + containerBlockData.addChunks(chunkInfo); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java new file mode 100644 index 0000000..0d7e1bc --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm; + + +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ContainerCommandResponseProto; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +/** + * This class represents the Async reply from XceiverClient. + */ +public class XceiverClientAsyncReply { + + private CompletableFuture<ContainerCommandResponseProto> response; + private Long logIndex; + private Collection<CommitInfo> commitInfos; + + public XceiverClientAsyncReply( + CompletableFuture<ContainerCommandResponseProto> response) { + this(response, 0, null); + } + + public XceiverClientAsyncReply( + CompletableFuture<ContainerCommandResponseProto> response, long index, + Collection<CommitInfo> commitInfos) { + this.commitInfos = commitInfos; + this.logIndex = index; + this.response = response; + } + + /** + * A class having details about latest commitIndex for each server in the + * Ratis pipeline. For Standalone pipeline, commitInfo will be null. + */ + public static class CommitInfo { + + private final String server; + + private final Long commitIndex; + + public CommitInfo(String server, long commitIndex) { + this.server = server; + this.commitIndex = commitIndex; + } + + public String getServer() { + return server; + } + + public long getCommitIndex() { + return commitIndex; + } + } + + public Collection<CommitInfo> getCommitInfos() { + return commitInfos; + } + + public CompletableFuture<ContainerCommandResponseProto> getResponse() { + return response; + } + + public long getLogIndex() { + return logIndex; + } + + public void setCommitInfos(Collection<CommitInfo> commitInfos) { + this.commitInfos = commitInfos; + } + + public void setLogIndex(Long logIndex) { + this.logIndex = logIndex; + } + + public void setResponse( + CompletableFuture<ContainerCommandResponseProto> response) { + this.response = response; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index b36315e..9eb49ae 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -28,8 +28,8 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import java.io.Closeable; import java.io.IOException; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; /** @@ -98,7 +98,10 @@ public abstract class XceiverClientSpi implements Closeable { public ContainerCommandResponseProto sendCommand( ContainerCommandRequestProto request) throws IOException { try { - return sendCommandAsync(request).get(); + XceiverClientAsyncReply reply; + reply = sendCommandAsync(request); + ContainerCommandResponseProto responseProto = reply.getResponse().get(); + return responseProto; } catch (ExecutionException | InterruptedException e) { throw new IOException("Failed to command " + request, e); } @@ -111,7 +114,7 @@ public abstract class XceiverClientSpi implements Closeable { * @return Response to the command * @throws IOException */ - public abstract CompletableFuture<ContainerCommandResponseProto> + public abstract XceiverClientAsyncReply sendCommandAsync(ContainerCommandRequestProto request) throws IOException, ExecutionException, InterruptedException; @@ -132,4 +135,7 @@ public abstract class XceiverClientSpi implements Closeable { * @return - {Stand_Alone, Ratis or Chained} */ public abstract HddsProtos.ReplicationType getPipelineType(); + + public abstract void watchForCommit(long index, long timeout) + throws InterruptedException, ExecutionException, TimeoutException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index c1d90a5..04f4cbc 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.storage; +import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply; import org.apache.hadoop.hdds.scm.container.common.helpers .BlockNotCommittedException; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; @@ -64,6 +65,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. import org.apache.hadoop.hdds.client.BlockID; import java.io.IOException; +import java.util.concurrent.ExecutionException; /** * Implementation of all container protocol calls performed by Container @@ -163,6 +165,31 @@ public final class ContainerProtocolCalls { } /** + * Calls the container protocol to put a container block. + * + * @param xceiverClient client to perform call + * @param containerBlockData block data to identify container + * @param traceID container protocol call args + * @return putBlockResponse + * @throws Exception if there is an error while performing the call + */ + public static XceiverClientAsyncReply putBlockAsync( + XceiverClientSpi xceiverClient, BlockData containerBlockData, + String traceID) + throws IOException, InterruptedException, ExecutionException { + PutBlockRequestProto.Builder createBlockRequest = + PutBlockRequestProto.newBuilder().setBlockData(containerBlockData); + String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); + ContainerCommandRequestProto request = + ContainerCommandRequestProto.newBuilder().setCmdType(Type.PutBlock) + .setContainerID(containerBlockData.getBlockID().getContainerID()) + .setTraceID(traceID).setDatanodeUuid(id) + .setPutBlock(createBlockRequest).build(); + xceiverClient.sendCommand(request); + return xceiverClient.sendCommandAsync(request); + } + + /** * Calls the container protocol to read a chunk. * * @param xceiverClient client to perform call @@ -200,7 +227,7 @@ public final class ContainerProtocolCalls { * @param blockID ID of the block * @param data the data of the chunk to write * @param traceID container protocol call args - * @throws IOException if there is an I/O error while performing the call + * @throws Exception if there is an error while performing the call */ public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID, ByteString data, String traceID) @@ -224,6 +251,32 @@ public final class ContainerProtocolCalls { } /** + * Calls the container protocol to write a chunk. + * + * @param xceiverClient client to perform call + * @param chunk information about chunk to write + * @param blockID ID of the block + * @param data the data of the chunk to write + * @param traceID container protocol call args + * @throws IOException if there is an I/O error while performing the call + */ + public static XceiverClientAsyncReply writeChunkAsync( + XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID, + ByteString data, String traceID) + throws IOException, ExecutionException, InterruptedException { + WriteChunkRequestProto.Builder writeChunkRequest = + WriteChunkRequestProto.newBuilder() + .setBlockID(blockID.getDatanodeBlockIDProtobuf()) + .setChunkData(chunk).setData(data); + String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); + ContainerCommandRequestProto request = + ContainerCommandRequestProto.newBuilder().setCmdType(Type.WriteChunk) + .setContainerID(blockID.getContainerID()).setTraceID(traceID) + .setDatanodeUuid(id).setWriteChunk(writeChunkRequest).build(); + return xceiverClient.sendCommandAsync(request); + } + + /** * Allows writing a small file using single RPC. This takes the container * name, block name and data to write sends all that data to the container * using a single RPC. This API is designed to be used for files which are @@ -420,7 +473,7 @@ public final class ContainerProtocolCalls { * @param response container protocol call response * @throws IOException if the container protocol call failed */ - private static void validateContainerResponse( + public static void validateContainerResponse( ContainerCommandResponseProto response ) throws StorageContainerException { if (response.getResult() == ContainerProtos.Result.SUCCESS) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 9776817..8a5762f 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -112,6 +112,22 @@ public final class OzoneConfigKeys { public static final String OZONE_CLIENT_PROTOCOL = "ozone.client.protocol"; + public static final String OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE = + "ozone.client.stream.buffer.flush.size"; + + public static final long OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE_DEFAULT = 64; + + public static final String OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE = + "ozone.client.stream.buffer.max.size"; + + public static final long OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT = 128; + + public static final String OZONE_CLIENT_WATCH_REQUEST_TIMEOUT = + "ozone.client.watch.request.timeout"; + + public static final String OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT = + "30s"; + // This defines the overall connection limit for the connection pool used in // RestClient. public static final String OZONE_REST_CLIENT_HTTP_CONNECTION_MAX = @@ -192,14 +208,6 @@ public final class OzoneConfigKeys { public static final int OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT = 10; - public static final String OZONE_CLIENT_MAX_RETRIES = - "ozone.client.max.retries"; - public static final int OZONE_CLIENT_MAX_RETRIES_DEFAULT = 50; - - public static final String OZONE_CLIENT_RETRY_INTERVAL = - "ozone.client.retry.interval"; - public static final String OZONE_CLIENT_RETRY_INTERVAL_DEFAULT = "200ms"; - public static final String DFS_CONTAINER_RATIS_ENABLED_KEY = ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY; public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-hdds/common/src/main/resources/ozone-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 2ffc2ab..54bffd5 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -335,19 +335,29 @@ </description> </property> <property> - <name>ozone.client.max.retries</name> - <value>50</value> + <name>ozone.client.stream.buffer.flush.size</name> + <value>64</value> <tag>OZONE, CLIENT</tag> - <description>Maximum number of retries by Ozone Client on encountering - exception while fetching committed block length. + <description>Size in mb which determines at what buffer position , a partial + flush will be initiated during write. It should be ideally a mutiple + of chunkSize. </description> </property> <property> - <name>ozone.client.retry.interval</name> - <value>200ms</value> + <name>ozone.client.stream.buffer.max.size</name> + <value>128</value> + <tag>OZONE, CLIENT</tag> + <description>Size in mb which determines at what buffer position , + write call be blocked till acknowledgement of the fisrt partial flush + happens by all servers. + </description> + </property> + <property> + <name>ozone.client.watch.request.timeout</name> + <value>30s</value> <tag>OZONE, CLIENT</tag> - <description>Interval between retries by Ozone Client on encountering - exception while fetching committed block length. + <description>Timeout for the watch API in Ratis client to acknowledge + a particular request getting replayed to all servers. </description> </property> <property> http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java index e2e5700..ea0e819 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java @@ -121,6 +121,9 @@ public class BlockManagerImpl implements BlockManager { container.updateBlockCommitSequenceId(bcsId); // Increment keycount here container.getContainerData().incrKeyCount(); + LOG.debug( + "Block " + data.getBlockID() + " successfully committed with bcsId " + + bcsId + " chunk size " + data.getChunks().size()); return data.getSize(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java index 40e4d83..be1449f 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java @@ -17,23 +17,14 @@ */ package org.apache.hadoop.ozone.client; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.client.OzoneQuota; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; -import org.apache.hadoop.hdds.scm.container.common.helpers.BlockNotCommittedException; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.rest.response.*; import java.util.ArrayList; import java.util.List; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; - /** A utility class for OzoneClient. */ public final class OzoneClientUtils { @@ -94,24 +85,6 @@ public final class OzoneClientUtils { return keyInfo; } - public static RetryPolicy createRetryPolicy(Configuration conf) { - int maxRetryCount = - conf.getInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, OzoneConfigKeys. - OZONE_CLIENT_MAX_RETRIES_DEFAULT); - long retryInterval = conf.getTimeDuration(OzoneConfigKeys. - OZONE_CLIENT_RETRY_INTERVAL, OzoneConfigKeys. - OZONE_CLIENT_RETRY_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); - RetryPolicy basePolicy = RetryPolicies - .retryUpToMaximumCountWithFixedSleep(maxRetryCount, retryInterval, - TimeUnit.MILLISECONDS); - Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = - new HashMap<Class<? extends Exception>, RetryPolicy>(); - exceptionToPolicyMap.put(BlockNotCommittedException.class, basePolicy); - RetryPolicy retryPolicy = RetryPolicies - .retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL, - exceptionToPolicyMap); - return retryPolicy; - } /** * Returns a KeyInfoDetails object constructed using fields of the input * OzoneKeyDetails object. http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index 450e2dc..5dbe9f6 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -24,11 +24,10 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; @@ -41,18 +40,17 @@ import org.apache.hadoop.hdds.scm.container.common.helpers import org.apache.hadoop.hdds.scm.protocolPB .StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.scm.storage.ChunkOutputStream; -import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.io.InterruptedIOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.ListIterator; +import java.util.concurrent.TimeoutException; /** * Maintaining a list of ChunkInputStream. Write based on offset. @@ -71,7 +69,6 @@ public class ChunkGroupOutputStream extends OutputStream { // array list's get(index) is O(1) private final ArrayList<ChunkOutputStreamEntry> streamEntries; private int currentStreamIndex; - private long byteOffset; private final OzoneManagerProtocolClientSideTranslatorPB omClient; private final StorageContainerLocationProtocolClientSideTranslatorPB scmClient; @@ -81,7 +78,11 @@ public class ChunkGroupOutputStream extends OutputStream { private final int chunkSize; private final String requestID; private boolean closed; - private final RetryPolicy retryPolicy; + private final long streamBufferFlushSize; + private final long streamBufferMaxSize; + private final long watchTimeout; + private final long blockSize; + private ByteBuffer buffer; /** * A constructor for testing purpose only. */ @@ -96,7 +97,11 @@ public class ChunkGroupOutputStream extends OutputStream { chunkSize = 0; requestID = null; closed = false; - retryPolicy = null; + streamBufferFlushSize = 0; + streamBufferMaxSize = 0; + buffer = ByteBuffer.allocate(1); + watchTimeout = 0; + blockSize = 0; } /** @@ -127,35 +132,54 @@ public class ChunkGroupOutputStream extends OutputStream { new OmKeyLocationInfo.Builder().setBlockID(streamEntry.blockID) .setLength(streamEntry.currentPosition).setOffset(0) .build(); + LOG.debug("block written " + streamEntry.blockID + ", length " + + streamEntry.currentPosition + " bcsID " + streamEntry.blockID + .getBlockCommitSequenceId()); locationInfoList.add(info); } return locationInfoList; } - public ChunkGroupOutputStream( - OpenKeySession handler, XceiverClientManager xceiverClientManager, + public ChunkGroupOutputStream(OpenKeySession handler, + XceiverClientManager xceiverClientManager, StorageContainerLocationProtocolClientSideTranslatorPB scmClient, - OzoneManagerProtocolClientSideTranslatorPB omClient, - int chunkSize, String requestId, ReplicationFactor factor, - ReplicationType type, RetryPolicy retryPolicy) throws IOException { + OzoneManagerProtocolClientSideTranslatorPB omClient, int chunkSize, + String requestId, ReplicationFactor factor, ReplicationType type, + long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout) { this.streamEntries = new ArrayList<>(); this.currentStreamIndex = 0; - this.byteOffset = 0; this.omClient = omClient; this.scmClient = scmClient; OmKeyInfo info = handler.getKeyInfo(); - this.keyArgs = new OmKeyArgs.Builder() - .setVolumeName(info.getVolumeName()) - .setBucketName(info.getBucketName()) - .setKeyName(info.getKeyName()) - .setType(type) - .setFactor(factor) - .setDataSize(info.getDataSize()).build(); + this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName()) + .setBucketName(info.getBucketName()).setKeyName(info.getKeyName()) + .setType(type).setFactor(factor).setDataSize(info.getDataSize()) + .build(); this.openID = handler.getId(); this.xceiverClientManager = xceiverClientManager; this.chunkSize = chunkSize; this.requestID = requestId; - this.retryPolicy = retryPolicy; + this.streamBufferFlushSize = bufferFlushSize * OzoneConsts.MB; + this.streamBufferMaxSize = bufferMaxSize * OzoneConsts.MB; + this.blockSize = size * OzoneConsts.MB; + this.watchTimeout = watchTimeout; + + Preconditions.checkState(chunkSize > 0); + Preconditions.checkState(streamBufferFlushSize > 0); + Preconditions.checkState(streamBufferMaxSize > 0); + Preconditions.checkState(blockSize > 0); + Preconditions.checkState(streamBufferFlushSize % chunkSize == 0); + Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0); + Preconditions.checkState(blockSize % streamBufferMaxSize == 0); + + // This byteBuffer will be used to cache data until all the blockCommits + // (putBlock) gets replicated to all/majority servers. The idea here is to + // allocate the buffer of size blockSize so that as and when a chunk is + // is replicated to all servers, as a part of discarding the buffer, we + // don't necessarily need to run compaction(buffer.compact() on the buffer + // to actually discard the acknowledged data. Compaction is inefficient so + // it would be a better choice to avoid compaction on the happy I/O path. + this.buffer = ByteBuffer.allocate((int) blockSize); } /** @@ -191,12 +215,13 @@ public class ChunkGroupOutputStream extends OutputStream { xceiverClientManager.acquireClient(containerWithPipeline.getPipeline()); streamEntries.add(new ChunkOutputStreamEntry(subKeyInfo.getBlockID(), keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID, - chunkSize, subKeyInfo.getLength())); + chunkSize, subKeyInfo.getLength(), streamBufferFlushSize, + streamBufferMaxSize, watchTimeout, buffer)); } @VisibleForTesting public long getByteOffset() { - return byteOffset; + return getKeyLength(); } @@ -223,21 +248,23 @@ public class ChunkGroupOutputStream extends OutputStream { public void write(byte[] b, int off, int len) throws IOException { checkNotClosed(); - handleWrite(b, off, len); + handleWrite(b, off, len, false, buffer.position()); } - private void handleWrite(byte[] b, int off, int len) throws IOException { + private void handleWrite(byte[] b, int off, int len, boolean retry, + int pos) throws IOException { if (b == null) { throw new NullPointerException(); } - if ((off < 0) || (off > b.length) || (len < 0) || - ((off + len) > b.length) || ((off + len) < 0)) { + if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) + || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } if (len == 0) { return; } int succeededAllocates = 0; + int initialPos; while (len > 0) { if (streamEntries.size() <= currentStreamIndex) { Preconditions.checkNotNull(omClient); @@ -247,8 +274,8 @@ public class ChunkGroupOutputStream extends OutputStream { allocateNewBlock(currentStreamIndex); succeededAllocates += 1; } catch (IOException ioe) { - LOG.error("Try to allocate more blocks for write failed, already " + - "allocated " + succeededAllocates + " blocks for this write."); + LOG.error("Try to allocate more blocks for write failed, already " + + "allocated " + succeededAllocates + " blocks for this write."); throw ioe; } } @@ -257,12 +284,19 @@ public class ChunkGroupOutputStream extends OutputStream { Preconditions.checkArgument(currentStreamIndex < streamEntries.size()); ChunkOutputStreamEntry current = streamEntries.get(currentStreamIndex); int writeLen = Math.min(len, (int) current.getRemaining()); + initialPos = pos < buffer.position() ? pos : buffer.position(); try { - current.write(b, off, writeLen); + if (retry) { + current.writeOnRetry(len); + } else { + current.write(b, off, writeLen); + } } catch (IOException ioe) { - if (checkIfContainerIsClosed(ioe)) { - handleCloseContainerException(current, currentStreamIndex); - continue; + if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)) { + // for the current iteration, current pos - initialPos gives the + // amount of data already written to the buffer + writeLen = buffer.position() - initialPos; + handleException(current, currentStreamIndex); } else { throw ioe; } @@ -274,57 +308,6 @@ public class ChunkGroupOutputStream extends OutputStream { } len -= writeLen; off += writeLen; - byteOffset += writeLen; - } - } - - private long getCommittedBlockLength(ChunkOutputStreamEntry streamEntry) - throws IOException { - long blockLength; - ContainerProtos.GetCommittedBlockLengthResponseProto responseProto; - RetryPolicy.RetryAction action; - int numRetries = 0; - while (true) { - try { - responseProto = ContainerProtocolCalls - .getCommittedBlockLength(streamEntry.xceiverClient, - streamEntry.blockID, requestID); - blockLength = responseProto.getBlockLength(); - return blockLength; - } catch (StorageContainerException sce) { - try { - action = retryPolicy.shouldRetry(sce, numRetries, 0, true); - } catch (Exception e) { - throw e instanceof IOException ? (IOException) e : new IOException(e); - } - if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { - if (action.reason != null) { - LOG.error( - "GetCommittedBlockLength request failed. " + action.reason, - sce); - } - throw sce; - } - - // Throw the exception if the thread is interrupted - if (Thread.currentThread().isInterrupted()) { - LOG.warn("Interrupted while trying for connection"); - throw sce; - } - Preconditions.checkArgument( - action.action == RetryPolicy.RetryAction.RetryDecision.RETRY); - try { - Thread.sleep(action.delayMillis); - } catch (InterruptedException e) { - throw (IOException) new InterruptedIOException( - "Interrupted: action=" + action + ", retry policy=" + retryPolicy) - .initCause(e); - } - numRetries++; - LOG.trace("Retrying GetCommittedBlockLength request. Already tried " - + numRetries + " time(s); retry policy is " + retryPolicy); - continue; - } } } @@ -373,55 +356,35 @@ public class ChunkGroupOutputStream extends OutputStream { * * @param streamEntry StreamEntry * @param streamIndex Index of the entry - * @throws IOException Throws IOexception if Write fails + * @throws IOException Throws IOException if Write fails */ - private void handleCloseContainerException(ChunkOutputStreamEntry streamEntry, + private void handleException(ChunkOutputStreamEntry streamEntry, int streamIndex) throws IOException { - long committedLength = 0; - ByteBuffer buffer = streamEntry.getBuffer(); - if (buffer == null) { - // the buffer here will be null only when closeContainerException is - // hit while calling putKey during close on chunkOutputStream. - // Since closeContainer auto commit pending keys, no need to do - // anything here. - return; - } - - // update currentStreamIndex in case of closed container exception. The - // current stream entry cannot be used for further writes because - // container is closed. - currentStreamIndex += 1; + int lastSuccessfulFlushIndex = streamEntry.getLastSuccessfulFlushIndex(); + int currentPos = buffer.position(); - // In case where not a single chunk of data has been written to the Datanode - // yet. This block does not yet exist on the datanode but cached on the - // outputStream buffer. No need to call GetCommittedBlockLength here - // for this block associated with the stream here. - if (streamEntry.currentPosition >= chunkSize - || streamEntry.currentPosition != buffer.position()) { - committedLength = getCommittedBlockLength(streamEntry); - // update the length of the current stream - streamEntry.currentPosition = committedLength; + // In case of a failure, read the data from the position till the last + // acknowledgement happened. + if (lastSuccessfulFlushIndex > 0) { + buffer.position(lastSuccessfulFlushIndex); + buffer.limit(currentPos); + buffer.compact(); } if (buffer.position() > 0) { + //set the correct length for the current stream + streamEntry.currentPosition = lastSuccessfulFlushIndex; // If the data is still cached in the underlying stream, we need to - // allocate new block and write this data in the datanode. The cached - // data in the buffer does not exceed chunkSize. - Preconditions.checkState(buffer.position() < chunkSize); - // readjust the byteOffset value to the length actually been written. - byteOffset -= buffer.position(); - handleWrite(buffer.array(), 0, buffer.position()); + // allocate new block and write this data in the datanode. + currentStreamIndex += 1; + handleWrite(buffer.array(), 0, buffer.position(), true, + lastSuccessfulFlushIndex); } - // just clean up the current stream. Since the container is already closed, - // it will be auto committed. No need to call close again here. + // just clean up the current stream. streamEntry.cleanup(); - // This case will arise when while writing the first chunk itself fails. - // In such case, the current block associated with the stream has no data - // written. Remove it from the current stream list. - if (committedLength == 0) { + if (lastSuccessfulFlushIndex == 0) { streamEntries.remove(streamIndex); - Preconditions.checkArgument(currentStreamIndex != 0); currentStreamIndex -= 1; } // discard subsequent pre allocated blocks from the streamEntries list @@ -430,11 +393,15 @@ public class ChunkGroupOutputStream extends OutputStream { } private boolean checkIfContainerIsClosed(IOException ioe) { - return checkIfContainerNotOpenException(ioe) || Optional.of(ioe.getCause()) - .filter(e -> e instanceof StorageContainerException) - .map(e -> (StorageContainerException) e) - .filter(sce -> sce.getResult() == Result.CLOSED_CONTAINER_IO) - .isPresent(); + if (ioe.getCause() != null) { + return checkIfContainerNotOpenException(ioe) || Optional + .of(ioe.getCause()) + .filter(e -> e instanceof StorageContainerException) + .map(e -> (StorageContainerException) e) + .filter(sce -> sce.getResult() == Result.CLOSED_CONTAINER_IO) + .isPresent(); + } + return false; } private boolean checkIfContainerNotOpenException(IOException ioe) { @@ -448,6 +415,15 @@ public class ChunkGroupOutputStream extends OutputStream { return false; } + private boolean checkIfTimeoutException(IOException ioe) { + if (ioe.getCause() != null) { + return Optional.of(ioe.getCause()) + .filter(e -> e instanceof TimeoutException).isPresent(); + } else { + return false; + } + } + private long getKeyLength() { return streamEntries.parallelStream().mapToLong(e -> e.currentPosition) .sum(); @@ -495,11 +471,11 @@ public class ChunkGroupOutputStream extends OutputStream { entry.flush(); } } catch (IOException ioe) { - if (checkIfContainerIsClosed(ioe)) { + if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)) { // This call will allocate a new streamEntry and write the Data. // Close needs to be retried on the newly allocated streamEntry as // as well. - handleCloseContainerException(entry, streamIndex); + handleException(entry, streamIndex); handleFlushOrClose(close); } else { throw ioe; @@ -519,16 +495,24 @@ public class ChunkGroupOutputStream extends OutputStream { return; } closed = true; - handleFlushOrClose(true); - if (keyArgs != null) { - // in test, this could be null - removeEmptyBlocks(); - Preconditions.checkState(byteOffset == getKeyLength()); - keyArgs.setDataSize(byteOffset); - keyArgs.setLocationInfoList(getLocationInfoList()); - omClient.commitKey(keyArgs, openID); - } else { - LOG.warn("Closing ChunkGroupOutputStream, but key args is null"); + try { + handleFlushOrClose(true); + if (keyArgs != null) { + // in test, this could be null + removeEmptyBlocks(); + keyArgs.setDataSize(getKeyLength()); + keyArgs.setLocationInfoList(getLocationInfoList()); + omClient.commitKey(keyArgs, openID); + } else { + LOG.warn("Closing ChunkGroupOutputStream, but key args is null"); + } + } catch (IOException ioe) { + throw ioe; + } finally { + if (buffer != null) { + buffer.clear(); + } + buffer = null; } } @@ -544,7 +528,10 @@ public class ChunkGroupOutputStream extends OutputStream { private String requestID; private ReplicationType type; private ReplicationFactor factor; - private RetryPolicy retryPolicy; + private long streamBufferFlushSize; + private long streamBufferMaxSize; + private long blockSize; + private long watchTimeout; public Builder setHandler(OpenKeySession handler) { this.openHandler = handler; @@ -588,16 +575,31 @@ public class ChunkGroupOutputStream extends OutputStream { return this; } - public ChunkGroupOutputStream build() throws IOException { - return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient, - omClient, chunkSize, requestID, factor, type, retryPolicy); + public Builder setStreamBufferFlushSize(long size) { + this.streamBufferFlushSize = size; + return this; + } + + public Builder setStreamBufferMaxSize(long size) { + this.streamBufferMaxSize = size; + return this; + } + + public Builder setBlockSize(long size) { + this.blockSize = size; + return this; } - public Builder setRetryPolicy(RetryPolicy rPolicy) { - this.retryPolicy = rPolicy; + public Builder setWatchTimeout(long timeout) { + this.watchTimeout = timeout; return this; } + public ChunkGroupOutputStream build() throws IOException { + return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient, + omClient, chunkSize, requestID, factor, type, streamBufferFlushSize, + streamBufferMaxSize, blockSize, watchTimeout); + } } private static class ChunkOutputStreamEntry extends OutputStream { @@ -613,10 +615,16 @@ public class ChunkGroupOutputStream extends OutputStream { // the current position of this stream 0 <= currentPosition < length private long currentPosition; + private final long streamBufferFlushSize; + private final long streamBufferMaxSize; + private final long watchTimeout; + private ByteBuffer buffer; + ChunkOutputStreamEntry(BlockID blockID, String key, XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, String requestId, int chunkSize, - long length) { + long length, long streamBufferFlushSize, long streamBufferMaxSize, + long watchTimeout, ByteBuffer buffer) { this.outputStream = null; this.blockID = blockID; this.key = key; @@ -627,6 +635,10 @@ public class ChunkGroupOutputStream extends OutputStream { this.length = length; this.currentPosition = 0; + this.streamBufferFlushSize = streamBufferFlushSize; + this.streamBufferMaxSize = streamBufferMaxSize; + this.watchTimeout = watchTimeout; + this.buffer = buffer; } /** @@ -645,6 +657,10 @@ public class ChunkGroupOutputStream extends OutputStream { this.length = length; this.currentPosition = 0; + streamBufferFlushSize = 0; + streamBufferMaxSize = 0; + buffer = null; + watchTimeout = 0; } long getLength() { @@ -657,9 +673,10 @@ public class ChunkGroupOutputStream extends OutputStream { private void checkStream() { if (this.outputStream == null) { - this.outputStream = new ChunkOutputStream(blockID, - key, xceiverClientManager, xceiverClient, - requestId, chunkSize); + this.outputStream = + new ChunkOutputStream(blockID, key, xceiverClientManager, + xceiverClient, requestId, chunkSize, streamBufferFlushSize, + streamBufferMaxSize, watchTimeout, buffer); } } @@ -696,15 +713,21 @@ public class ChunkGroupOutputStream extends OutputStream { } } - ByteBuffer getBuffer() throws IOException { + int getLastSuccessfulFlushIndex() throws IOException { if (this.outputStream instanceof ChunkOutputStream) { ChunkOutputStream out = (ChunkOutputStream) this.outputStream; - return out.getBuffer(); + blockID = out.getBlockID(); + return out.getLastSuccessfulFlushIndex(); + } else if (outputStream == null) { + // For a pre allocated block for which no write has been initiated, + // the OutputStream will be null here. + // In such cases, the default blockCommitSequenceId will be 0 + return 0; } throw new IOException("Invalid Output Stream for Key: " + key); } - public void cleanup() { + void cleanup() { checkStream(); if (this.outputStream instanceof ChunkOutputStream) { ChunkOutputStream out = (ChunkOutputStream) this.outputStream; @@ -712,6 +735,16 @@ public class ChunkGroupOutputStream extends OutputStream { } } + void writeOnRetry(int len) throws IOException { + checkStream(); + if (this.outputStream instanceof ChunkOutputStream) { + ChunkOutputStream out = (ChunkOutputStream) this.outputStream; + out.writeOnRetry(len); + this.currentPosition += len; + } else { + throw new IOException("Invalid Output Stream for Key: " + key); + } + } } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index cbb2e49..826f04b 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -24,18 +24,17 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.StorageType; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ozone.OmUtils; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.*; import org.apache.hadoop.hdds.client.OzoneQuota; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.ozone.client.VolumeArgs; -import org.apache.hadoop.ozone.client.OzoneClientUtils; import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream; import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; import org.apache.hadoop.ozone.client.io.LengthInputStream; @@ -72,6 +71,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** @@ -94,7 +94,10 @@ public class RpcClient implements ClientProtocol { private final UserGroupInformation ugi; private final OzoneAcl.OzoneACLRights userRights; private final OzoneAcl.OzoneACLRights groupRights; - private final RetryPolicy retryPolicy; + private final long streamBufferFlushSize; + private final long streamBufferMaxSize; + private final long blockSize; + private final long watchTimeout; /** * Creates RpcClient instance with the given configuration. @@ -135,7 +138,6 @@ public class RpcClient implements ClientProtocol { Client.getRpcTimeout(conf))); this.xceiverClientManager = new XceiverClientManager(conf); - retryPolicy = OzoneClientUtils.createRetryPolicy(conf); int configuredChunkSize = conf.getInt( ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, @@ -149,6 +151,18 @@ public class RpcClient implements ClientProtocol { } else { chunkSize = configuredChunkSize; } + streamBufferFlushSize = + conf.getLong(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE, + OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE_DEFAULT); + streamBufferMaxSize = + conf.getLong(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE, + OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT); + blockSize = conf.getLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, + OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT); + watchTimeout = + conf.getTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, + OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); } private InetSocketAddress getScmAddressForClient() throws IOException { @@ -468,7 +482,10 @@ public class RpcClient implements ClientProtocol { .setRequestID(requestId) .setType(HddsProtos.ReplicationType.valueOf(type.toString())) .setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue())) - .setRetryPolicy(retryPolicy) + .setStreamBufferFlushSize(streamBufferFlushSize) + .setStreamBufferMaxSize(streamBufferMaxSize) + .setWatchTimeout(watchTimeout) + .setBlockSize(blockSize) .build(); groupOutputStream.addPreallocateBlocks( openKey.getKeyInfo().getLatestVersionLocations(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index 15bf8d0..b352e36 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -230,7 +230,10 @@ public interface MiniOzoneCluster { protected Boolean ozoneEnabled = true; protected Boolean randomContainerPort = true; - + protected Optional<Integer> chunkSize = Optional.empty(); + protected Optional<Long> streamBufferFlushSize = Optional.empty(); + protected Optional<Long> streamBufferMaxSize = Optional.empty(); + protected Optional<Long> blockSize = Optional.empty(); // Use relative smaller number of handlers for testing protected int numOfOmHandlers = 20; protected int numOfScmHandlers = 20; @@ -359,6 +362,46 @@ public interface MiniOzoneCluster { } /** + * Sets the chunk size. + * + * @return MiniOzoneCluster.Builder + */ + public Builder setChunkSize(int size) { + chunkSize = Optional.of(size); + return this; + } + + /** + * Sets the flush size for stream buffer. + * + * @return MiniOzoneCluster.Builder + */ + public Builder setStreamBufferFlushSize(long size) { + streamBufferFlushSize = Optional.of(size); + return this; + } + + /** + * Sets the max size for stream buffer. + * + * @return MiniOzoneCluster.Builder + */ + public Builder setStreamBufferMaxSize(long size) { + streamBufferMaxSize = Optional.of(size); + return this; + } + + /** + * Sets the block size for stream buffer. + * + * @return MiniOzoneCluster.Builder + */ + public Builder setBlockSize(long size) { + blockSize = Optional.of(size); + return this; + } + + /** * Constructs and returns MiniOzoneCluster. * * @return {@link MiniOzoneCluster} http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index 37b6fdc..324e17b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -391,6 +391,25 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster { Path metaDir = Paths.get(path, "ozone-meta"); Files.createDirectories(metaDir); conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, metaDir.toString()); + if (!chunkSize.isPresent()) { + chunkSize = Optional.of(1); + } + if (!streamBufferFlushSize.isPresent()) { + streamBufferFlushSize = Optional.of((long)chunkSize.get()); + } + if (!streamBufferMaxSize.isPresent()) { + streamBufferMaxSize = Optional.of(2 * streamBufferFlushSize.get()); + } + if (!blockSize.isPresent()) { + blockSize = Optional.of(2 * streamBufferMaxSize.get()); + } + conf.setInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, + (int) (chunkSize.get() * OzoneConsts.MB)); + conf.setLong(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE, + streamBufferFlushSize.get()); + conf.setLong(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE, + streamBufferMaxSize.get()); + conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, blockSize.get()); configureTrace(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java index 871f389..0197304 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java @@ -50,7 +50,7 @@ public interface RatisTestHelper { /** For testing Ozone with Ratis. */ class RatisTestSuite implements Closeable { - static final RpcType RPC = SupportedRpcType.NETTY; + static final RpcType RPC = SupportedRpcType.GRPC; static final int NUM_DATANODES = 3; private final OzoneConfiguration conf; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
