Repository: hadoop Updated Branches: refs/heads/trunk 5e773efd7 -> 1afba83f2
HDDS-870. Avoid creating block sized buffer in ChunkGroupOutputStream. Contributed by Shashikant Banerjee. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1afba83f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1afba83f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1afba83f Branch: refs/heads/trunk Commit: 1afba83f2cd74ae54b558101d235f237ccf454c0 Parents: 5e773ef Author: Jitendra Pandey <[email protected]> Authored: Sat Dec 8 09:23:10 2018 -0800 Committer: Jitendra Pandey <[email protected]> Committed: Sat Dec 8 09:23:10 2018 -0800 ---------------------------------------------------------------------- .../hadoop/hdds/scm/XceiverClientGrpc.java | 6 +- .../hadoop/hdds/scm/XceiverClientRatis.java | 110 +++-- .../hdds/scm/storage/ChunkOutputStream.java | 426 ++++++++++++------- .../hdds/scm/XceiverClientAsyncReply.java | 44 +- .../hadoop/hdds/scm/XceiverClientSpi.java | 19 +- .../scm/storage/ContainerProtocolCalls.java | 1 - .../common/src/main/resources/ozone-default.xml | 8 +- .../server/ratis/DispatcherContext.java | 4 +- .../ozone/client/io/ChunkGroupOutputStream.java | 119 +++--- .../rpc/TestCloseContainerHandlingByClient.java | 19 +- .../client/rpc/TestFailureHandlingByClient.java | 56 ++- .../hadoop/ozone/om/TestChunkStreams.java | 78 ---- 12 files changed, 505 insertions(+), 385 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1afba83f/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index a824c29..8bdbd1e 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -290,12 +290,16 @@ public class XceiverClientGrpc extends XceiverClientSpi { } @Override - public void watchForCommit(long index, long timeout) + public long watchForCommit(long index, long timeout) throws InterruptedException, ExecutionException, TimeoutException, IOException { // there is no notion of watch for commit index in standalone pipeline + return 0; }; + public long getReplicatedMinCommitIndex() { + return 0; + } /** * Returns pipeline Type. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/1afba83f/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index 28d3e7a..b1a70c0 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -18,7 +18,9 @@ package org.apache.hadoop.hdds.scm; +import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.HddsUtils; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.protocol.RaftRetryFailureException; import org.apache.ratis.retry.RetryPolicy; @@ -42,15 +44,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; -import java.util.Objects; -import java.util.Collection; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ConcurrentHashMap; /** * An abstract implementation of {@link XceiverClientSpi} using Ratis. @@ -79,6 +80,12 @@ public final class XceiverClientRatis extends XceiverClientSpi { private final int maxOutstandingRequests; private final RetryPolicy retryPolicy; + // Map to track commit index at every server + private final ConcurrentHashMap<String, Long> commitInfoMap; + + // create a separate RaftClient for watchForCommit API + private RaftClient watchClient; + /** * Constructs a client. */ @@ -89,6 +96,30 @@ public final class XceiverClientRatis extends XceiverClientSpi { this.rpcType = rpcType; this.maxOutstandingRequests = maxOutStandingChunks; this.retryPolicy = retryPolicy; + commitInfoMap = new ConcurrentHashMap<>(); + watchClient = null; + } + + private void updateCommitInfosMap( + Collection<RaftProtos.CommitInfoProto> commitInfoProtos) { + // if the commitInfo map is empty, just update the commit indexes for each + // of the servers + if (commitInfoMap.isEmpty()) { + commitInfoProtos.forEach(proto -> commitInfoMap + .put(proto.getServer().getAddress(), proto.getCommitIndex())); + // In case the commit is happening 2 way, just update the commitIndex + // for the servers which have been successfully updating the commit + // indexes. This is important because getReplicatedMinCommitIndex() + // should always return the min commit index out of the nodes which have + // been replicating data successfully. + } else { + commitInfoProtos.forEach(proto -> commitInfoMap + .computeIfPresent(proto.getServer().getAddress(), + (address, index) -> { + index = proto.getCommitIndex(); + return index; + })); + } } /** @@ -125,6 +156,9 @@ public final class XceiverClientRatis extends XceiverClientSpi { if (c != null) { closeRaftClient(c); } + if (watchClient != null) { + closeRaftClient(watchClient); + } } private void closeRaftClient(RaftClient raftClient) { @@ -148,39 +182,73 @@ public final class XceiverClientRatis extends XceiverClientSpi { getClient().sendAsync(() -> byteString); } + // gets the minimum log index replicated to all servers + @Override + public long getReplicatedMinCommitIndex() { + OptionalLong minIndex = + commitInfoMap.values().parallelStream().mapToLong(v -> v).min(); + return minIndex.isPresent() ? minIndex.getAsLong() : 0; + } + + private void getFailedServer( + Collection<RaftProtos.CommitInfoProto> commitInfos) { + for (RaftProtos.CommitInfoProto proto : commitInfos) { + + } + } + @Override - public void watchForCommit(long index, long timeout) + public long watchForCommit(long index, long timeout) throws InterruptedException, ExecutionException, TimeoutException, IOException { + long commitIndex = getReplicatedMinCommitIndex(); + if (commitIndex >= index) { + // return the min commit index till which the log has been replicated to + // all servers + return commitIndex; + } LOG.debug("commit index : {} watch timeout : {}", index, timeout); // create a new RaftClient instance for watch request - RaftClient raftClient = - RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy); - CompletableFuture<RaftClientReply> replyFuture = raftClient + if (watchClient == null) { + watchClient = + RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy); + } + CompletableFuture<RaftClientReply> replyFuture = watchClient .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED); + RaftClientReply reply; try { - replyFuture.get(timeout, TimeUnit.MILLISECONDS); + reply = replyFuture.get(timeout, TimeUnit.MILLISECONDS); } catch (TimeoutException toe) { LOG.warn("3 way commit failed ", toe); - closeRaftClient(raftClient); + closeRaftClient(watchClient); // generate a new raft client instance again so that next watch request // does not get blocked for the previous one // TODO : need to remove the code to create the new RaftClient instance // here once the watch request bypassing sliding window in Raft Client // gets fixed. - raftClient = + watchClient = RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy); - raftClient + reply = watchClient .sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED) .get(timeout, TimeUnit.MILLISECONDS); - LOG.info("Could not commit " + index + " to all the nodes." - + "Committed by majority."); - } finally { - closeRaftClient(raftClient); + Optional<RaftProtos.CommitInfoProto> + proto = reply.getCommitInfos().stream().min(Comparator.comparing( + RaftProtos.CommitInfoProto :: getCommitIndex)); + Preconditions.checkState(proto.isPresent()); + String address = proto.get().getServer().getAddress(); + // since 3 way commit has failed, the updated map from now on will + // only store entries for those datanodes which have had successful + // replication. + commitInfoMap.remove(address); + LOG.info( + "Could not commit " + index + " to all the nodes. Server " + address + + " has failed" + "Committed by majority."); } + return index; } + /** * Sends a given command to server gets a waitable future back. * @@ -193,8 +261,6 @@ public final class XceiverClientRatis extends XceiverClientSpi { XceiverClientAsyncReply asyncReply = new XceiverClientAsyncReply(null); CompletableFuture<RaftClientReply> raftClientReply = sendRequestAsync(request); - Collection<XceiverClientAsyncReply.CommitInfo> commitInfos = - new ArrayList<>(); CompletableFuture<ContainerCommandResponseProto> containerCommandResponse = raftClientReply.whenComplete((reply, e) -> LOG.debug( "received reply {} for request: cmdType={} containerID={}" @@ -212,14 +278,10 @@ public final class XceiverClientRatis extends XceiverClientSpi { ContainerCommandResponseProto response = ContainerCommandResponseProto .parseFrom(reply.getMessage().getContent()); - reply.getCommitInfos().forEach(e -> { - XceiverClientAsyncReply.CommitInfo commitInfo = - new XceiverClientAsyncReply.CommitInfo( - e.getServer().getAddress(), e.getCommitIndex()); - commitInfos.add(commitInfo); - asyncReply.setCommitInfos(commitInfos); + if (response.getResult() == ContainerProtos.Result.SUCCESS) { + updateCommitInfosMap(reply.getCommitInfos()); asyncReply.setLogIndex(reply.getLogIndex()); - }); + } return response; } catch (InvalidProtocolBufferException e) { throw new CompletionException(e); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1afba83f/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java index 85f8646..6e2ca59 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java @@ -37,15 +37,13 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.OutputStream; +import java.nio.Buffer; import java.nio.ByteBuffer; import java.util.UUID; import java.util.List; import java.util.ArrayList; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; +import java.util.concurrent.*; + import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls .putBlockAsync; import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls @@ -84,25 +82,30 @@ public class ChunkOutputStream extends OutputStream { private final long streamBufferFlushSize; private final long streamBufferMaxSize; private final long watchTimeout; - private ByteBuffer buffer; + private List<ByteBuffer> bufferList; // The IOException will be set by response handling thread in case there is an // exception received in the response. If the exception is set, the next // request will fail upfront. private IOException ioException; private ExecutorService responseExecutor; - // position of the buffer where the last flush was attempted - private int lastFlushPos; + // the effective length of data flushed so far + private long totalDataFlushedLength; + + // effective data write attempted so far for the block + private long writtenDataLength; - // position of the buffer till which the flush was successfully - // acknowledged by all nodes in pipeline - private int lastSuccessfulFlushIndex; + // total data which has been successfully flushed and acknowledged + // by all servers + private long totalAckDataLength; // list to hold up all putBlock futures private List<CompletableFuture<ContainerProtos.ContainerCommandResponseProto>> futureList; - // list maintaining commit indexes for putBlocks - private List<Long> commitIndexList; + // map containing mapping for putBlock logIndex to to flushedDataLength Map. + private ConcurrentHashMap<Long, Long> commitIndex2flushedDataMap; + + private int currentBufferIndex; /** * Creates a new ChunkOutputStream. @@ -113,12 +116,17 @@ public class ChunkOutputStream extends OutputStream { * @param xceiverClient client to perform container calls * @param traceID container protocol call args * @param chunkSize chunk size + * @param bufferList list of byte buffers + * @param streamBufferFlushSize flush size + * @param streamBufferMaxSize max size of the currentBuffer + * @param watchTimeout watch timeout + * @param checksum checksum */ public ChunkOutputStream(BlockID blockID, String key, XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, String traceID, int chunkSize, long streamBufferFlushSize, - long streamBufferMaxSize, long watchTimeout, ByteBuffer buffer, - Checksum checksum) { + long streamBufferMaxSize, long watchTimeout, + List<ByteBuffer> bufferList, Checksum checksum) { this.blockID = blockID; this.key = key; this.traceID = traceID; @@ -135,24 +143,36 @@ public class ChunkOutputStream extends OutputStream { this.streamBufferFlushSize = streamBufferFlushSize; this.streamBufferMaxSize = streamBufferMaxSize; this.watchTimeout = watchTimeout; - this.buffer = buffer; - this.ioException = null; + this.bufferList = bufferList; this.checksum = checksum; // A single thread executor handle the responses of async requests responseExecutor = Executors.newSingleThreadExecutor(); - commitIndexList = new ArrayList<>(); - lastSuccessfulFlushIndex = 0; + commitIndex2flushedDataMap = new ConcurrentHashMap<>(); + totalAckDataLength = 0; futureList = new ArrayList<>(); - lastFlushPos = 0; + totalDataFlushedLength = 0; + currentBufferIndex = 0; + writtenDataLength = 0; } public BlockID getBlockID() { return blockID; } - public int getLastSuccessfulFlushIndex() { - return lastSuccessfulFlushIndex; + public long getTotalSuccessfulFlushedData() { + return totalAckDataLength; + } + + public long getWrittenDataLength() { + return writtenDataLength; + } + + private long computeBufferData() { + int dataLength = + bufferList.stream().mapToInt(Buffer::position).sum(); + Preconditions.checkState(dataLength <= streamBufferMaxSize); + return dataLength; } @@ -176,139 +196,172 @@ public class ChunkOutputStream extends OutputStream { if (len == 0) { return; } - checkOpen(); while (len > 0) { + checkOpen(); int writeLen; - writeLen = Math.min(chunkSize - buffer.position() % chunkSize, len); - buffer.put(b, off, writeLen); - if (buffer.position() % chunkSize == 0) { - int pos = buffer.position() - chunkSize; - int limit = buffer.position(); + allocateBuffer(); + ByteBuffer currentBuffer = getCurrentBuffer(); + writeLen = + Math.min(chunkSize - currentBuffer.position() % chunkSize, len); + currentBuffer.put(b, off, writeLen); + if (currentBuffer.position() % chunkSize == 0) { + int pos = currentBuffer.position() - chunkSize; + int limit = currentBuffer.position(); writeChunk(pos, limit); } off += writeLen; len -= writeLen; - if (buffer.position() >= streamBufferFlushSize - && buffer.position() % streamBufferFlushSize == 0) { - - lastFlushPos = buffer.position(); - futureList.add(handlePartialFlush()); + writtenDataLength += writeLen; + if (currentBuffer.position() == streamBufferFlushSize) { + totalDataFlushedLength += streamBufferFlushSize; + handlePartialFlush(); } - if (buffer.position() >= streamBufferMaxSize - && buffer.position() % streamBufferMaxSize == 0) { + long bufferedData = computeBufferData(); + // Data in the bufferList can not exceed streamBufferMaxSize + if (bufferedData == streamBufferMaxSize) { handleFullBuffer(); } } } + private ByteBuffer getCurrentBuffer() { + ByteBuffer buffer = bufferList.get(currentBufferIndex); + if (!buffer.hasRemaining()) { + currentBufferIndex = + currentBufferIndex < getMaxNumBuffers() - 1 ? ++currentBufferIndex : + 0; + } + return bufferList.get(currentBufferIndex); + } + + private int getMaxNumBuffers() { + return (int)(streamBufferMaxSize/streamBufferFlushSize); + } + + private void allocateBuffer() { + for (int i = bufferList.size(); i < getMaxNumBuffers(); i++) { + bufferList.add(ByteBuffer.allocate((int)streamBufferFlushSize)); + } + } + /** * Will be called on the retryPath in case closedContainerException/ * TimeoutException. * @param len length of data to write - * @throws IOException if error occured + * @throws IOException if error occurred */ - // In this case, the data is already cached in the buffer. - public void writeOnRetry(int len) throws IOException { + // In this case, the data is already cached in the currentBuffer. + public void writeOnRetry(long len) throws IOException { if (len == 0) { return; } int off = 0; - checkOpen(); + int pos = off; while (len > 0) { - int writeLen; + long writeLen; writeLen = Math.min(chunkSize, len); if (writeLen == chunkSize) { - int pos = off; int limit = pos + chunkSize; writeChunk(pos, limit); } off += writeLen; len -= writeLen; + writtenDataLength += writeLen; if (off % streamBufferFlushSize == 0) { - lastFlushPos = off; - futureList.add(handlePartialFlush()); + // reset the position to zero as now we wll readng thhe next buffer in + // the list + pos = 0; + totalDataFlushedLength += streamBufferFlushSize; + handlePartialFlush(); } - if (off % streamBufferMaxSize == 0) { + if (computeBufferData() % streamBufferMaxSize == 0) { handleFullBuffer(); } } } - private void handleResponse( - ContainerProtos.ContainerCommandResponseProto response, - XceiverClientAsyncReply asyncReply) { - validateResponse(response); - discardBuffer(asyncReply); - } - - private void discardBuffer(XceiverClientAsyncReply asyncReply) { - if (!commitIndexList.isEmpty()) { - long index = commitIndexList.get(0); - if (checkIfBufferDiscardRequired(asyncReply, index)) { - updateFlushIndex(); - } - } - } - /** - * just update the lastSuccessfulFlushIndex. Since we have allocated - * the buffer more than the streamBufferMaxSize, we can keep on writing - * to the buffer. In case of failure, we will read the data starting from - * lastSuccessfulFlushIndex. + * just update the totalAckDataLength. Since we have allocated + * the currentBuffer more than the streamBufferMaxSize, we can keep on writing + * to the currentBuffer. In case of failure, we will read the data starting + * from totalAckDataLength. */ - private void updateFlushIndex() { - lastSuccessfulFlushIndex += streamBufferFlushSize; - LOG.debug("Discarding buffer till pos " + lastSuccessfulFlushIndex); - if (!commitIndexList.isEmpty()) { - commitIndexList.remove(0); + private void updateFlushIndex(long index) { + if (!commitIndex2flushedDataMap.isEmpty()) { + Preconditions.checkState(commitIndex2flushedDataMap.containsKey(index)); + totalAckDataLength = commitIndex2flushedDataMap.remove(index); + LOG.debug("Total data successfully replicated: " + totalAckDataLength); futureList.remove(0); - } - - } - /** - * Check if the last commitIndex stored at the beginning of the - * commitIndexList is less than equal to current commitInfo indexes. - * If its true, the buffer has been successfully flushed till the - * last position where flush happened. - */ - private boolean checkIfBufferDiscardRequired( - XceiverClientAsyncReply asyncReply, long commitIndex) { - if (asyncReply.getCommitInfos() != null) { - for (XceiverClientAsyncReply.CommitInfo info : asyncReply - .getCommitInfos()) { - if (info.getCommitIndex() < commitIndex) { - return false; - } + // Flush has been committed to required servers successful. + // just swap the bufferList head and tail after clearing. + ByteBuffer currentBuffer = bufferList.remove(0); + currentBuffer.clear(); + if (currentBufferIndex != 0) { + currentBufferIndex--; } + bufferList.add(currentBuffer); } - return true; } /** - * This is a blocking call.It will wait for the flush till the commit index - * at the head of the commitIndexList gets replicated to all or majority. + * This is a blocking call. It will wait for the flush till the commit index + * at the head of the commitIndex2flushedDataMap gets replicated to all or + * majority. * @throws IOException */ private void handleFullBuffer() throws IOException { - if (!commitIndexList.isEmpty()) { - watchForCommit(commitIndexList.get(0)); + try { + checkOpen(); + if (!futureList.isEmpty()) { + waitOnFlushFutures(); + } + } catch (InterruptedException | ExecutionException e) { + adjustBuffersOnException(); + throw new IOException( + "Unexpected Storage Container Exception: " + e.toString(), e); + } + if (!commitIndex2flushedDataMap.isEmpty()) { + watchForCommit( + commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v) + .min().getAsLong()); } } + private void adjustBuffers(long commitIndex) { + commitIndex2flushedDataMap.keySet().stream().forEach(index -> { + if (index <= commitIndex) { + updateFlushIndex(index); + } else { + return; + } + }); + } + + // It may happen that once the exception is encountered , we still might + // have successfully flushed up to a certain index. Make sure the buffers + // only contain data which have not been sufficiently replicated + private void adjustBuffersOnException() { + adjustBuffers(xceiverClient.getReplicatedMinCommitIndex()); + } + /** * calls watchForCommit API of the Ratis Client. For Standalone client, * it is a no op. * @param commitIndex log index to watch for + * @return minimum commit index replicated to all nodes * @throws IOException IOException in case watch gets timed out */ private void watchForCommit(long commitIndex) throws IOException { checkOpen(); - Preconditions.checkState(!commitIndexList.isEmpty()); + Preconditions.checkState(!commitIndex2flushedDataMap.isEmpty()); try { - xceiverClient.watchForCommit(commitIndex, watchTimeout); + long index = + xceiverClient.watchForCommit(commitIndex, watchTimeout); + adjustBuffers(index); } catch (TimeoutException | InterruptedException | ExecutionException e) { LOG.warn("watchForCommit failed for index " + commitIndex, e); + adjustBuffersOnException(); throw new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); } @@ -317,68 +370,79 @@ public class ChunkOutputStream extends OutputStream { private CompletableFuture<ContainerProtos. ContainerCommandResponseProto> handlePartialFlush() throws IOException { + checkOpen(); + long flushPos = totalDataFlushedLength; String requestId = traceID + ContainerProtos.Type.PutBlock + chunkIndex + blockID; + CompletableFuture<ContainerProtos. + ContainerCommandResponseProto> flushFuture; try { XceiverClientAsyncReply asyncReply = putBlockAsync(xceiverClient, containerBlockData.build(), requestId); CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future = asyncReply.getResponse(); - - return future.thenApplyAsync(e -> { - handleResponse(e, asyncReply); + flushFuture = future.thenApplyAsync(e -> { + try { + validateResponse(e); + } catch (IOException sce) { + future.completeExceptionally(sce); + return e; + } // if the ioException is not set, putBlock is successful if (ioException == null) { LOG.debug( - "Adding index " + asyncReply.getLogIndex() + " commitList size " - + commitIndexList.size()); + "Adding index " + asyncReply.getLogIndex() + " commitMap size " + + commitIndex2flushedDataMap.size()); BlockID responseBlockID = BlockID.getFromProtobuf( e.getPutBlock().getCommittedBlockLength().getBlockID()); Preconditions.checkState(blockID.getContainerBlockID() .equals(responseBlockID.getContainerBlockID())); // updates the bcsId of the block blockID = responseBlockID; - long index = asyncReply.getLogIndex(); // for standalone protocol, logIndex will always be 0. - if (index != 0) { - commitIndexList.add(index); - } else { - updateFlushIndex(); - } + commitIndex2flushedDataMap.put(asyncReply.getLogIndex(), flushPos); } return e; - }, responseExecutor); + }, responseExecutor).exceptionally(e -> { + LOG.debug( + "putBlock failed for blockID " + blockID + " with exception " + e + .getLocalizedMessage()); + CompletionException ce = new CompletionException(e); + setIoException(ce); + throw ce; + }); } catch (IOException | InterruptedException | ExecutionException e) { throw new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); } + futureList.add(flushFuture); + return flushFuture; } @Override public void flush() throws IOException { if (xceiverClientManager != null && xceiverClient != null - && buffer != null) { + && bufferList != null) { checkOpen(); - if (buffer.position() > 0 && lastSuccessfulFlushIndex != buffer - .position()) { + int bufferSize = bufferList.size(); + if (bufferSize > 0) { try { - - // flush the last chunk data residing on the buffer - if (buffer.position() % chunkSize > 0) { - int pos = buffer.position() - (buffer.position() % chunkSize); - writeChunk(pos, buffer.position()); - } - if (lastFlushPos != buffer.position()) { - lastFlushPos = buffer.position(); + // flush the last chunk data residing on the currentBuffer + if (totalDataFlushedLength < writtenDataLength) { + ByteBuffer currentBuffer = getCurrentBuffer(); + int pos = currentBuffer.position() - (currentBuffer.position() + % chunkSize); + int limit = currentBuffer.position() - pos; + writeChunk(pos, currentBuffer.position()); + totalDataFlushedLength += limit; handlePartialFlush(); } - CompletableFuture<Void> combinedFuture = CompletableFuture.allOf( - futureList.toArray(new CompletableFuture[futureList.size()])); - combinedFuture.get(); + waitOnFlushFutures(); // just check again if the exception is hit while waiting for the // futures to ensure flush has indeed succeeded checkOpen(); } catch (InterruptedException | ExecutionException e) { + adjustBuffersOnException(); throw new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); } @@ -388,11 +452,11 @@ public class ChunkOutputStream extends OutputStream { private void writeChunk(int pos, int limit) throws IOException { // Please note : We are not flipping the slice when we write since - // the slices are pointing the buffer start and end as needed for + // the slices are pointing the currentBuffer start and end as needed for // the chunk write. Also please note, Duplicate does not create a // copy of data, it only creates metadata that points to the data // stream. - ByteBuffer chunk = buffer.duplicate(); + ByteBuffer chunk = bufferList.get(currentBufferIndex).duplicate(); chunk.position(pos); chunk.limit(limit); writeChunkToContainer(chunk); @@ -401,49 +465,78 @@ public class ChunkOutputStream extends OutputStream { @Override public void close() throws IOException { if (xceiverClientManager != null && xceiverClient != null - && buffer != null) { - try { - if (buffer.position() > lastFlushPos) { - int pos = buffer.position() - (buffer.position() % chunkSize); - writeChunk(pos, buffer.position()); - futureList.add(handlePartialFlush()); - } - CompletableFuture<Void> combinedFuture = CompletableFuture.allOf( - futureList.toArray(new CompletableFuture[futureList.size()])); - - // wait for all the transactions to complete - combinedFuture.get(); - - // irrespective of whether the commitIndexList is empty or not, - // ensure there is no exception set(For Standalone Protocol) - checkOpen(); - if (!commitIndexList.isEmpty()) { - // wait for the last commit index in the commitIndexList to get - // committed to all or majority of nodes in case timeout happens. - long lastIndex = commitIndexList.get(commitIndexList.size() - 1); - LOG.debug( - "waiting for last flush Index " + lastIndex + " to catch up"); - watchForCommit(lastIndex); - updateFlushIndex(); + && bufferList != null) { + int bufferSize = bufferList.size(); + if (bufferSize > 0) { + try { + // flush the last chunk data residing on the currentBuffer + if (totalDataFlushedLength < writtenDataLength) { + ByteBuffer currentBuffer = getCurrentBuffer(); + int pos = currentBuffer.position() - (currentBuffer.position() + % chunkSize); + int limit = currentBuffer.position() - pos; + writeChunk(pos, currentBuffer.position()); + totalDataFlushedLength += limit; + handlePartialFlush(); + } + waitOnFlushFutures(); + // irrespective of whether the commitIndex2flushedDataMap is empty + // or not, ensure there is no exception set + checkOpen(); + if (!commitIndex2flushedDataMap.isEmpty()) { + // wait for the last commit index in the commitIndex2flushedDataMap + // to get committed to all or majority of nodes in case timeout + // happens. + long lastIndex = + commitIndex2flushedDataMap.keySet().stream() + .mapToLong(v -> v).max().getAsLong(); + LOG.debug( + "waiting for last flush Index " + lastIndex + " to catch up"); + watchForCommit(lastIndex); + } + } catch (InterruptedException | ExecutionException e) { + adjustBuffersOnException(); + throw new IOException( + "Unexpected Storage Container Exception: " + e.toString(), e); + } finally { + cleanup(); } - } catch (InterruptedException | ExecutionException e) { - throw new IOException( - "Unexpected Storage Container Exception: " + e.toString(), e); - } finally { - cleanup(); } + // clear the currentBuffer + bufferList.stream().forEach(ByteBuffer::clear); } - // clear the buffer - buffer.clear(); + } + + private void waitOnFlushFutures() + throws InterruptedException, ExecutionException { + CompletableFuture<Void> combinedFuture = CompletableFuture + .allOf(futureList.toArray(new CompletableFuture[futureList.size()])); + // wait for all the transactions to complete + combinedFuture.get(); } private void validateResponse( - ContainerProtos.ContainerCommandResponseProto responseProto) { + ContainerProtos.ContainerCommandResponseProto responseProto) + throws IOException { try { + // if the ioException is already set, it means a prev request has failed + // just throw the exception. The current operation will fail with the + // original error + if (ioException != null) { + throw ioException; + } ContainerProtocolCalls.validateContainerResponse(responseProto); } catch (StorageContainerException sce) { - ioException = new IOException( - "Unexpected Storage Container Exception: " + sce.toString(), sce); + LOG.error("Unexpected Storage Container Exception: ", sce); + setIoException(sce); + throw sce; + } + } + + private void setIoException(Exception e) { + if (ioException != null) { + ioException = new IOException( + "Unexpected Storage Container Exception: " + e.toString(), e); } } @@ -457,7 +550,10 @@ public class ChunkOutputStream extends OutputStream { futureList.clear(); } futureList = null; - commitIndexList = null; + if (commitIndex2flushedDataMap != null) { + commitIndex2flushedDataMap.clear(); + } + commitIndex2flushedDataMap = null; responseExecutor.shutdown(); } @@ -471,6 +567,7 @@ public class ChunkOutputStream extends OutputStream { if (xceiverClient == null) { throw new IOException("ChunkOutputStream has been closed."); } else if (ioException != null) { + adjustBuffersOnException(); throw ioException; } } @@ -504,16 +601,27 @@ public class ChunkOutputStream extends OutputStream { CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future = asyncReply.getResponse(); future.thenApplyAsync(e -> { - handleResponse(e, asyncReply); + try { + validateResponse(e); + } catch (IOException sce) { + future.completeExceptionally(sce); + } return e; - }, responseExecutor); + }, responseExecutor).exceptionally(e -> { + LOG.debug( + "writing chunk failed " + chunkInfo.getChunkName() + " blockID " + + blockID + " with exception " + e.getLocalizedMessage()); + CompletionException ce = new CompletionException(e); + setIoException(ce); + throw ce; + }); } catch (IOException | InterruptedException | ExecutionException e) { throw new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); } LOG.debug( "writing chunk " + chunkInfo.getChunkName() + " blockID " + blockID - + " length " + chunk.remaining()); + + " length " + effectiveChunkSize); containerBlockData.addChunks(chunkInfo); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1afba83f/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java index 0d7e1bc..40d97bf 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java @@ -21,8 +21,6 @@ package org.apache.hadoop.hdds.scm; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandResponseProto; - -import java.util.Collection; import java.util.concurrent.CompletableFuture; /** @@ -32,49 +30,13 @@ public class XceiverClientAsyncReply { private CompletableFuture<ContainerCommandResponseProto> response; private Long logIndex; - private Collection<CommitInfo> commitInfos; public XceiverClientAsyncReply( CompletableFuture<ContainerCommandResponseProto> response) { - this(response, 0, null); - } - - public XceiverClientAsyncReply( - CompletableFuture<ContainerCommandResponseProto> response, long index, - Collection<CommitInfo> commitInfos) { - this.commitInfos = commitInfos; - this.logIndex = index; + this.logIndex = (long)0; this.response = response; } - /** - * A class having details about latest commitIndex for each server in the - * Ratis pipeline. For Standalone pipeline, commitInfo will be null. - */ - public static class CommitInfo { - - private final String server; - - private final Long commitIndex; - - public CommitInfo(String server, long commitIndex) { - this.server = server; - this.commitIndex = commitIndex; - } - - public String getServer() { - return server; - } - - public long getCommitIndex() { - return commitIndex; - } - } - - public Collection<CommitInfo> getCommitInfos() { - return commitInfos; - } - public CompletableFuture<ContainerCommandResponseProto> getResponse() { return response; } @@ -83,10 +45,6 @@ public class XceiverClientAsyncReply { return logIndex; } - public void setCommitInfos(Collection<CommitInfo> commitInfos) { - this.commitInfos = commitInfos; - } - public void setLogIndex(Long logIndex) { this.logIndex = logIndex; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1afba83f/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index e9896dc..87cda56 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -125,7 +125,24 @@ public abstract class XceiverClientSpi implements Closeable { */ public abstract HddsProtos.ReplicationType getPipelineType(); - public abstract void watchForCommit(long index, long timeout) + /** + * Check if an specfic commitIndex is replicated to majority/all servers. + * @param index index to watch for + * @param timeout timeout provided for the watch ipeartion to complete + * @return the min commit index replicated to all or majority servers + * in case of a failure + * @throws InterruptedException + * @throws ExecutionException + * @throws TimeoutException + * @throws IOException + */ + public abstract long watchForCommit(long index, long timeout) throws InterruptedException, ExecutionException, TimeoutException, IOException; + + /** + * returns the min commit index replicated to all servers. + * @return min commit index replicated to all servers. + */ + public abstract long getReplicatedMinCommitIndex(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1afba83f/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 8af3973..977a784 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -189,7 +189,6 @@ public final class ContainerProtocolCalls { .setContainerID(containerBlockData.getBlockID().getContainerID()) .setTraceID(traceID).setDatanodeUuid(id) .setPutBlock(createBlockRequest).build(); - xceiverClient.sendCommand(request); return xceiverClient.sendCommandAsync(request); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1afba83f/hadoop-hdds/common/src/main/resources/ozone-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 2d62baa..2f4179a 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -365,8 +365,8 @@ <name>ozone.client.stream.buffer.flush.size</name> <value>64MB</value> <tag>OZONE, CLIENT</tag> - <description>Size in mb which determines at what buffer position , a partial - flush will be initiated during write. It should be ideally a mutiple + <description>Size which determines at what buffer position , a partial + flush will be initiated during write. It should be ideally a multiple of chunkSize. </description> </property> @@ -374,8 +374,8 @@ <name>ozone.client.stream.buffer.max.size</name> <value>128MB</value> <tag>OZONE, CLIENT</tag> - <description>Size in mb which determines at what buffer position , - write call be blocked till acknowledgement of the fisrt partial flush + <description>Size which determines at what buffer position, + write call be blocked till acknowledgement of the first partial flush happens by all servers. </description> </property> http://git-wip-us.apache.org/repos/asf/hadoop/blob/1afba83f/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java index a46e6b8..4ca99e0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java @@ -21,7 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; /** - * DispatcherContext class holds transport protocol specfic context info + * DispatcherContext class holds transport protocol specific context info * required for execution of container commands over the container dispatcher. */ @InterfaceAudience.Private @@ -121,7 +121,7 @@ public class DispatcherContext { } /** - * Builds and returns DatanodeDetails instance. + * Builds and returns DispatcherContext instance. * * @return DispatcherContext */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/1afba83f/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index 4d76395..bc88255 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -83,8 +83,8 @@ public class ChunkGroupOutputStream extends OutputStream { private final long streamBufferMaxSize; private final long watchTimeout; private final long blockSize; - private ByteBuffer buffer; private final Checksum checksum; + private List<ByteBuffer> bufferList; /** * A constructor for testing purpose only. */ @@ -101,7 +101,9 @@ public class ChunkGroupOutputStream extends OutputStream { closed = false; streamBufferFlushSize = 0; streamBufferMaxSize = 0; - buffer = ByteBuffer.allocate(1); + bufferList = new ArrayList<>(1); + ByteBuffer buffer = ByteBuffer.allocate(1); + bufferList.add(buffer); watchTimeout = 0; blockSize = 0; this.checksum = new Checksum(); @@ -177,15 +179,7 @@ public class ChunkGroupOutputStream extends OutputStream { Preconditions.checkState(streamBufferFlushSize % chunkSize == 0); Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0); Preconditions.checkState(blockSize % streamBufferMaxSize == 0); - - // This byteBuffer will be used to cache data until all the blockCommits - // (putBlock) gets replicated to all/majority servers. The idea here is to - // allocate the buffer of size blockSize so that as and when a chunk is - // is replicated to all servers, as a part of discarding the buffer, we - // don't necessarily need to run compaction(buffer.compact() on the buffer - // to actually discard the acknowledged data. Compaction is inefficient so - // it would be a better choice to avoid compaction on the happy I/O path. - this.buffer = ByteBuffer.allocate((int) blockSize); + this.bufferList = new ArrayList<>(); } /** @@ -222,12 +216,7 @@ public class ChunkGroupOutputStream extends OutputStream { streamEntries.add(new ChunkOutputStreamEntry(subKeyInfo.getBlockID(), keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID, chunkSize, subKeyInfo.getLength(), streamBufferFlushSize, - streamBufferMaxSize, watchTimeout, buffer, checksum)); - } - - @VisibleForTesting - public long getByteOffset() { - return getKeyLength(); + streamBufferMaxSize, watchTimeout, bufferList, checksum)); } @@ -254,11 +243,6 @@ public class ChunkGroupOutputStream extends OutputStream { public void write(byte[] b, int off, int len) throws IOException { checkNotClosed(); - handleWrite(b, off, len, false, buffer.position()); - } - - private void handleWrite(byte[] b, int off, int len, boolean retry, - int pos) throws IOException { if (b == null) { throw new NullPointerException(); } @@ -269,8 +253,17 @@ public class ChunkGroupOutputStream extends OutputStream { if (len == 0) { return; } + handleWrite(b, off, len, false); + } + + private long computeBufferData() { + return bufferList.stream().mapToInt(value -> value.position()) + .sum(); + } + + private void handleWrite(byte[] b, int off, long len, boolean retry) + throws IOException { int succeededAllocates = 0; - int initialPos; while (len > 0) { if (streamEntries.size() <= currentStreamIndex) { Preconditions.checkNotNull(omClient); @@ -289,8 +282,12 @@ public class ChunkGroupOutputStream extends OutputStream { // still do a sanity check. Preconditions.checkArgument(currentStreamIndex < streamEntries.size()); ChunkOutputStreamEntry current = streamEntries.get(currentStreamIndex); - int writeLen = Math.min(len, (int) current.getRemaining()); - initialPos = pos < buffer.position() ? pos : buffer.position(); + + // length(len) will be in int range if the call is happening through + // write API of chunkOutputStream. Length can be in long range if it comes + // via Exception path. + int writeLen = Math.min((int)len, (int) current.getRemaining()); + long currentPos = current.getWrittenDataLength(); try { if (retry) { current.writeOnRetry(len); @@ -299,9 +296,10 @@ public class ChunkGroupOutputStream extends OutputStream { } } catch (IOException ioe) { if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)) { - // for the current iteration, current pos - initialPos gives the + // for the current iteration, totalDataWritten - currentPos gives the // amount of data already written to the buffer - writeLen = buffer.position() - initialPos; + writeLen = (int) (current.getWrittenDataLength() - currentPos); + LOG.debug("writeLen {}, total len {}", writeLen, len); handleException(current, currentStreamIndex); } else { throw ioe; @@ -366,30 +364,20 @@ public class ChunkGroupOutputStream extends OutputStream { */ private void handleException(ChunkOutputStreamEntry streamEntry, int streamIndex) throws IOException { - int lastSuccessfulFlushIndex = streamEntry.getLastSuccessfulFlushIndex(); - int currentPos = buffer.position(); - - // In case of a failure, read the data from the position till the last - // acknowledgement happened. - if (lastSuccessfulFlushIndex > 0) { - buffer.position(lastSuccessfulFlushIndex); - buffer.limit(currentPos); - buffer.compact(); - } - - if (buffer.position() > 0) { - //set the correct length for the current stream - streamEntry.currentPosition = lastSuccessfulFlushIndex; + long totalSuccessfulFlushedData = + streamEntry.getTotalSuccessfulFlushedData(); + //set the correct length for the current stream + streamEntry.currentPosition = totalSuccessfulFlushedData; + long bufferedDataLen = computeBufferData(); + // just clean up the current stream. + streamEntry.cleanup(); + if (bufferedDataLen > 0) { // If the data is still cached in the underlying stream, we need to // allocate new block and write this data in the datanode. currentStreamIndex += 1; - handleWrite(buffer.array(), 0, buffer.position(), true, - lastSuccessfulFlushIndex); + handleWrite(null, 0, bufferedDataLen, true); } - - // just clean up the current stream. - streamEntry.cleanup(); - if (lastSuccessfulFlushIndex == 0) { + if (totalSuccessfulFlushedData == 0) { streamEntries.remove(streamIndex); currentStreamIndex -= 1; } @@ -433,7 +421,7 @@ public class ChunkGroupOutputStream extends OutputStream { } private long getKeyLength() { - return streamEntries.parallelStream().mapToLong(e -> e.currentPosition) + return streamEntries.stream().mapToLong(e -> e.currentPosition) .sum(); } @@ -517,10 +505,10 @@ public class ChunkGroupOutputStream extends OutputStream { } catch (IOException ioe) { throw ioe; } finally { - if (buffer != null) { - buffer.clear(); + if (bufferList != null) { + bufferList.stream().forEach(e -> e.clear()); } - buffer = null; + bufferList = null; } } @@ -633,13 +621,13 @@ public class ChunkGroupOutputStream extends OutputStream { private final long streamBufferFlushSize; private final long streamBufferMaxSize; private final long watchTimeout; - private ByteBuffer buffer; + private List<ByteBuffer> bufferList; ChunkOutputStreamEntry(BlockID blockID, String key, XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, String requestId, int chunkSize, long length, long streamBufferFlushSize, long streamBufferMaxSize, - long watchTimeout, ByteBuffer buffer, Checksum checksum) { + long watchTimeout, List<ByteBuffer> bufferList, Checksum checksum) { this.outputStream = null; this.blockID = blockID; this.key = key; @@ -653,8 +641,8 @@ public class ChunkGroupOutputStream extends OutputStream { this.streamBufferFlushSize = streamBufferFlushSize; this.streamBufferMaxSize = streamBufferMaxSize; this.watchTimeout = watchTimeout; - this.buffer = buffer; this.checksum = checksum; + this.bufferList = bufferList; } /** @@ -676,7 +664,7 @@ public class ChunkGroupOutputStream extends OutputStream { this.currentPosition = 0; streamBufferFlushSize = 0; streamBufferMaxSize = 0; - buffer = null; + bufferList = null; watchTimeout = 0; this.checksum = checksum; } @@ -694,7 +682,7 @@ public class ChunkGroupOutputStream extends OutputStream { this.outputStream = new ChunkOutputStream(blockID, key, xceiverClientManager, xceiverClient, requestId, chunkSize, streamBufferFlushSize, - streamBufferMaxSize, watchTimeout, buffer, checksum); + streamBufferMaxSize, watchTimeout, bufferList, checksum); } } @@ -731,11 +719,24 @@ public class ChunkGroupOutputStream extends OutputStream { } } - int getLastSuccessfulFlushIndex() throws IOException { + long getTotalSuccessfulFlushedData() throws IOException { if (this.outputStream instanceof ChunkOutputStream) { ChunkOutputStream out = (ChunkOutputStream) this.outputStream; blockID = out.getBlockID(); - return out.getLastSuccessfulFlushIndex(); + return out.getTotalSuccessfulFlushedData(); + } else if (outputStream == null) { + // For a pre allocated block for which no write has been initiated, + // the OutputStream will be null here. + // In such cases, the default blockCommitSequenceId will be 0 + return 0; + } + throw new IOException("Invalid Output Stream for Key: " + key); + } + + long getWrittenDataLength() throws IOException { + if (this.outputStream instanceof ChunkOutputStream) { + ChunkOutputStream out = (ChunkOutputStream) this.outputStream; + return out.getWrittenDataLength(); } else if (outputStream == null) { // For a pre allocated block for which no write has been initiated, // the OutputStream will be null here. @@ -753,7 +754,7 @@ public class ChunkGroupOutputStream extends OutputStream { } } - void writeOnRetry(int len) throws IOException { + void writeOnRetry(long len) throws IOException { checkStream(); if (this.outputStream instanceof ChunkOutputStream) { ChunkOutputStream out = (ChunkOutputStream) this.outputStream; http://git-wip-us.apache.org/repos/asf/hadoop/blob/1afba83f/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java index d268aeb..6b7276e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java @@ -109,6 +109,10 @@ public class TestCloseContainerHandlingByClient { objectStore.getVolume(volumeName).createBucket(bucketName); } + private String getKeyName() { + return UUID.randomUUID().toString(); + } + /** * Shutdown MiniDFSCluster. */ @@ -121,7 +125,7 @@ public class TestCloseContainerHandlingByClient { @Test public void testBlockWritesWithFlushAndClose() throws Exception { - String keyName = "standalone"; + String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); // write data more than 1 chunk byte[] data = ContainerTestHelper @@ -153,7 +157,7 @@ public class TestCloseContainerHandlingByClient { @Test public void testBlockWritesCloseConsistency() throws Exception { - String keyName = "standalone2"; + String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); // write data more than 1 chunk byte[] data = ContainerTestHelper @@ -181,7 +185,7 @@ public class TestCloseContainerHandlingByClient { @Test public void testMultiBlockWrites() throws Exception { - String keyName = "standalone3"; + String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, (4 * blockSize)); ChunkGroupOutputStream groupOutputStream = @@ -227,8 +231,7 @@ public class TestCloseContainerHandlingByClient { @Test public void testMultiBlockWrites2() throws Exception { - String keyName = "ratis2"; - long dataLength; + String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 4 * blockSize); ChunkGroupOutputStream groupOutputStream = @@ -272,7 +275,7 @@ public class TestCloseContainerHandlingByClient { @Test public void testMultiBlockWrites3() throws Exception { - String keyName = "standalone5"; + String keyName = getKeyName(); int keyLen = 4 * blockSize; OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, keyLen); ChunkGroupOutputStream groupOutputStream = @@ -391,7 +394,7 @@ public class TestCloseContainerHandlingByClient { // on the datanode. @Test public void testDiscardPreallocatedBlocks() throws Exception { - String keyName = "discardpreallocatedblocks"; + String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 2 * blockSize); ChunkGroupOutputStream groupOutputStream = @@ -447,7 +450,7 @@ public class TestCloseContainerHandlingByClient { @Test public void testBlockWriteViaRatis() throws Exception { - String keyName = "ratis"; + String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); byte[] data = ContainerTestHelper .getFixedLengthString(keyString, chunkSize + chunkSize / 2) http://git-wip-us.apache.org/repos/asf/hadoop/blob/1afba83f/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java index 98976cf..1ba4820 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java @@ -66,10 +66,6 @@ public class TestFailureHandlingByClient { private static int maxRetries; /** - * TODO: we will spawn new MiniOzoneCluster every time for each unit test - * invocation. Need to use the same instance for all tests. - */ - /** * Create a MiniDFSCluster for testing. * <p> * Ozone is made active by setting OZONE_ENABLED = true @@ -86,6 +82,11 @@ public class TestFailureHandlingByClient { TimeUnit.SECONDS); conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 5); + conf.setTimeDuration( + OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY, + 1, TimeUnit.SECONDS); + conf.setQuietMode(false); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(6).build(); @@ -112,7 +113,7 @@ public class TestFailureHandlingByClient { @Test public void testBlockWritesWithDnFailures() throws Exception { - String keyName = "ratis3"; + String keyName = UUID.randomUUID().toString(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); byte[] data = ContainerTestHelper @@ -189,6 +190,51 @@ public class TestFailureHandlingByClient { validateData(keyName, data.concat(data).getBytes()); } + @Test + public void testMultiBlockWritesWithIntermittentDnFailures() + throws Exception { + String keyName = UUID.randomUUID().toString(); + OzoneOutputStream key = + createKey(keyName, ReplicationType.RATIS, 6 * blockSize); + String data = ContainerTestHelper + .getFixedLengthString(keyString, blockSize + chunkSize); + key.write(data.getBytes()); + + // get the name of a valid container + Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); + ChunkGroupOutputStream groupOutputStream = + (ChunkGroupOutputStream) key.getOutputStream(); + List<OmKeyLocationInfo> locationInfoList = + groupOutputStream.getLocationInfoList(); + Assert.assertTrue(locationInfoList.size() == 6); + long containerId = locationInfoList.get(1).getContainerID(); + ContainerInfo container = + cluster.getStorageContainerManager().getContainerManager() + .getContainer(ContainerID.valueof(containerId)); + Pipeline pipeline = + cluster.getStorageContainerManager().getPipelineManager() + .getPipeline(container.getPipelineID()); + List<DatanodeDetails> datanodes = pipeline.getNodes(); + cluster.shutdownHddsDatanode(datanodes.get(0)); + + // The write will fail but exception will be handled and length will be + // updated correctly in OzoneManager once the steam is closed + key.write(data.getBytes()); + + // shutdown the second datanode + cluster.shutdownHddsDatanode(datanodes.get(1)); + key.write(data.getBytes()); + key.close(); + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) + .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName) + .build(); + OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); + Assert.assertEquals(3 * data.getBytes().length, keyInfo.getDataSize()); + validateData(keyName, data.concat(data).concat(data).getBytes()); + } + + private OzoneOutputStream createKey(String keyName, ReplicationType type, long size) throws Exception { return ContainerTestHelper http://git-wip-us.apache.org/repos/asf/hadoop/blob/1afba83f/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java index 7dd31da..6b91837 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java @@ -18,15 +18,12 @@ package org.apache.hadoop.ozone.om; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream; -import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; import org.apache.hadoop.hdds.scm.storage.ChunkInputStream; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.OutputStream; import java.io.IOException; import java.util.ArrayList; @@ -41,81 +38,9 @@ public class TestChunkStreams { @Rule public ExpectedException exception = ExpectedException.none(); - /** - * This test uses ByteArrayOutputStream as the underlying stream to test - * the correctness of ChunkGroupOutputStream. - * - * @throws Exception - */ - @Test - public void testWriteGroupOutputStream() throws Exception { - try (ChunkGroupOutputStream groupOutputStream = - new ChunkGroupOutputStream()) { - ArrayList<OutputStream> outputStreams = new ArrayList<>(); - - // 5 byte streams, each 100 bytes. write 500 bytes means writing to each - // of them with 100 bytes. - for (int i = 0; i < 5; i++) { - ByteArrayOutputStream out = new ByteArrayOutputStream(100); - outputStreams.add(out); - groupOutputStream.addStream(out, 100); - } - assertEquals(0, groupOutputStream.getByteOffset()); - - String dataString = RandomStringUtils.randomAscii(500); - byte[] data = dataString.getBytes(UTF_8); - groupOutputStream.write(data, 0, data.length); - assertEquals(500, groupOutputStream.getByteOffset()); - - String res = ""; - int offset = 0; - for (OutputStream stream : outputStreams) { - String subString = stream.toString(); - res += subString; - assertEquals(dataString.substring(offset, offset + 100), subString); - offset += 100; - } - assertEquals(dataString, res); - } - } - - @Test - public void testErrorWriteGroupOutputStream() throws Exception { - try (ChunkGroupOutputStream groupOutputStream = - new ChunkGroupOutputStream()) { - ArrayList<OutputStream> outputStreams = new ArrayList<>(); - - // 5 byte streams, each 100 bytes. write 500 bytes means writing to each - // of them with 100 bytes. all 5 streams makes up a ChunkGroupOutputStream - // with a total of 500 bytes in size - for (int i = 0; i < 5; i++) { - ByteArrayOutputStream out = new ByteArrayOutputStream(100); - outputStreams.add(out); - groupOutputStream.addStream(out, 100); - } - assertEquals(0, groupOutputStream.getByteOffset()); - - // first writes of 100 bytes should succeed - groupOutputStream.write(RandomStringUtils.randomAscii(100) - .getBytes(UTF_8)); - assertEquals(100, groupOutputStream.getByteOffset()); - - // second writes of 500 bytes should fail, as there should be only 400 - // bytes space left - // TODO : if we decide to take the 400 bytes instead in the future, - // other add more informative error code rather than exception, need to - // change this part. - exception.expect(Exception.class); - groupOutputStream.write(RandomStringUtils.randomAscii(500) - .getBytes(UTF_8)); - assertEquals(100, groupOutputStream.getByteOffset()); - } - } - @Test public void testReadGroupInputStream() throws Exception { try (ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream()) { - ArrayList<ChunkInputStream> inputStreams = new ArrayList<>(); String dataString = RandomStringUtils.randomAscii(500); byte[] buf = dataString.getBytes(UTF_8); @@ -157,7 +82,6 @@ public class TestChunkStreams { return readLen; } }; - inputStreams.add(in); offset += 100; groupInputStream.addStream(in, 100); } @@ -173,7 +97,6 @@ public class TestChunkStreams { @Test public void testErrorReadGroupInputStream() throws Exception { try (ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream()) { - ArrayList<ChunkInputStream> inputStreams = new ArrayList<>(); String dataString = RandomStringUtils.randomAscii(500); byte[] buf = dataString.getBytes(UTF_8); @@ -215,7 +138,6 @@ public class TestChunkStreams { return readLen; } }; - inputStreams.add(in); offset += 100; groupInputStream.addStream(in, 100); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
