This is an automated email from the ASF dual-hosted git repository. captainzmc pushed a commit to branch HDDS-4454 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 88c4d59d9b780ad29cbf0bd1f7a3981a3b697c98 Author: hao guo <[email protected]> AuthorDate: Mon Mar 28 16:16:31 2022 +0800 HDDS-6137. [Ozone-Streaming] Refactor KeyDataStreamOutput. (#3195) --- .../hdds/scm/storage/AbstractDataStreamOutput.java | 130 +++++++++++++++++++++ .../hdds/scm/storage/BlockDataStreamOutput.java | 36 +----- .../org/apache/hadoop/hdds/ratis/RatisHelper.java | 32 +++++ .../ozone/container/ContainerTestHelper.java | 12 ++ .../server/ratis/ContainerStateMachine.java | 35 ++++-- .../ozone/container/keyvalue/KeyValueHandler.java | 33 +++--- .../keyvalue/impl/KeyValueStreamDataChannel.java | 2 +- .../client/io/BlockDataStreamOutputEntryPool.java | 26 ----- .../ozone/client/io/KeyDataStreamOutput.java | 121 ++----------------- .../client/rpc/TestBlockDataStreamOutput.java | 4 +- .../rpc/TestContainerStateMachineStream.java | 59 ++++++++-- 11 files changed, 279 insertions(+), 211 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java new file mode 100644 index 0000000000..e29670d781 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.storage; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.ratis.protocol.exceptions.AlreadyClosedException; +import org.apache.ratis.protocol.exceptions.RaftRetryFailureException; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Map; + +/** + * This class is used for error handling methods. + */ +public abstract class AbstractDataStreamOutput + implements ByteBufferStreamOutput { + + private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap; + private int retryCount; + private boolean isException; + + protected AbstractDataStreamOutput( + Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap) { + this.retryPolicyMap = retryPolicyMap; + this.isException = false; + this.retryCount = 0; + } + + @VisibleForTesting + public int getRetryCount() { + return retryCount; + } + + protected void resetRetryCount() { + retryCount = 0; + } + + protected boolean isException() { + return isException; + } + + /** + * Checks if the provided exception signifies retry failure in ratis client. + * In case of retry failure, ratis client throws RaftRetryFailureException + * and all succeeding operations are failed with AlreadyClosedException. + */ + protected boolean checkForRetryFailure(Throwable t) { + return t instanceof RaftRetryFailureException + || t instanceof AlreadyClosedException; + } + + // Every container specific exception from datatnode will be seen as + // StorageContainerException + protected boolean checkIfContainerToExclude(Throwable t) { + return t instanceof StorageContainerException; + } + + protected void setExceptionAndThrow(IOException ioe) throws IOException { + isException = true; + throw ioe; + } + + protected void handleRetry(IOException exception) throws IOException { + RetryPolicy retryPolicy = retryPolicyMap + .get(HddsClientUtils.checkForException(exception).getClass()); + if (retryPolicy == null) { + retryPolicy = retryPolicyMap.get(Exception.class); + } + handleRetry(exception, retryPolicy); + } + + protected void handleRetry(IOException exception, RetryPolicy retryPolicy) + throws IOException { + RetryPolicy.RetryAction action = null; + try { + action = retryPolicy.shouldRetry(exception, retryCount, 0, true); + } catch (Exception e) { + setExceptionAndThrow(new IOException(e)); + } + if (action != null && + action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { + String msg = ""; + if (action.reason != null) { + msg = "Retry request failed. " + action.reason; + } + setExceptionAndThrow(new IOException(msg, exception)); + } + + // Throw the exception if the thread is interrupted + if (Thread.currentThread().isInterrupted()) { + setExceptionAndThrow(exception); + } + Preconditions.checkNotNull(action); + Preconditions.checkArgument( + action.action == RetryPolicy.RetryAction.RetryDecision.RETRY); + if (action.delayMillis > 0) { + try { + Thread.sleep(action.delayMillis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + IOException ioe = (IOException) new InterruptedIOException( + "Interrupted: action=" + action + ", retry policy=" + retryPolicy) + .initCause(e); + setExceptionAndThrow(ioe); + } + } + retryCount++; + } +} diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index 3df5eb0e12..d5b9dd9d81 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage; +import org.apache.hadoop.hdds.ratis.RatisHelper; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.XceiverClientManager; @@ -44,8 +45,6 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.ratis.client.api.DataStreamOutput; import org.apache.ratis.io.StandardWriteOption; import org.apache.ratis.protocol.DataStreamReply; -import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.protocol.RoutingTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -208,44 +207,13 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { if (isDatastreamPipelineMode) { return Preconditions.checkNotNull(xceiverClient.getDataStreamApi()) .stream(message.getContent().asReadOnlyByteBuffer(), - getRoutingTable(pipeline)); + RatisHelper.getRoutingTable(pipeline)); } else { return Preconditions.checkNotNull(xceiverClient.getDataStreamApi()) .stream(message.getContent().asReadOnlyByteBuffer()); } } - public RoutingTable getRoutingTable(Pipeline pipeline) { - RaftPeerId primaryId = null; - List<RaftPeerId> raftPeers = new ArrayList<>(); - - for (DatanodeDetails dn : pipeline.getNodes()) { - final RaftPeerId raftPeerId = RaftPeerId.valueOf(dn.getUuidString()); - try { - if (dn == pipeline.getFirstNode()) { - primaryId = raftPeerId; - } - } catch (IOException e) { - LOG.error("Can not get FirstNode from the pipeline: {} with " + - "exception: {}", pipeline.toString(), e.getLocalizedMessage()); - return null; - } - raftPeers.add(raftPeerId); - } - - RoutingTable.Builder builder = RoutingTable.newBuilder(); - RaftPeerId previousId = primaryId; - for (RaftPeerId peerId : raftPeers) { - if (peerId.equals(primaryId)) { - continue; - } - builder.addSuccessor(previousId, peerId); - previousId = peerId; - } - - return builder.build(); - } - public BlockID getBlockID() { return blockID.get(); } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java index c50a184285..e431c67df7 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java @@ -55,6 +55,7 @@ import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.RoutingTable; import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; @@ -425,4 +426,35 @@ public final class RatisHelper { throw new RuntimeException(e); } } + + public static RoutingTable getRoutingTable(Pipeline pipeline) { + RaftPeerId primaryId = null; + List<RaftPeerId> raftPeers = new ArrayList<>(); + + for (DatanodeDetails dn : pipeline.getNodes()) { + final RaftPeerId raftPeerId = RaftPeerId.valueOf(dn.getUuidString()); + try { + if (dn == pipeline.getFirstNode()) { + primaryId = raftPeerId; + } + } catch (IOException e) { + LOG.error("Can not get FirstNode from the pipeline: {} with " + + "exception: {}", pipeline.toString(), e.getLocalizedMessage()); + return null; + } + raftPeers.add(raftPeerId); + } + + RoutingTable.Builder builder = RoutingTable.newBuilder(); + RaftPeerId previousId = primaryId; + for (RaftPeerId peerId : raftPeers) { + if (peerId.equals(primaryId)) { + continue; + } + builder.addSuccessor(previousId, peerId); + previousId = peerId; + } + + return builder.build(); + } } diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index db8943f2a8..3ad5968482 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -599,6 +599,18 @@ public final class ContainerTestHelper { return String.format("%1$" + length + "s", string); } + public static byte[] generateData(int length, boolean random) { + final byte[] data = new byte[length]; + if (random) { + ThreadLocalRandom.current().nextBytes(data); + } else { + for (int i = 0; i < length; i++) { + data[i] = (byte) i; + } + } + return data; + } + /** * Construct fake protobuf messages for various types of requests. * This is tedious, however necessary to test. Protobuf classes are final diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 3ef9477d97..916d3e7f5b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -62,6 +62,7 @@ import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; +import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.util.Time; @@ -549,19 +550,29 @@ public class ContainerStateMachine extends BaseStateMachine { @Override public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) { - return CompletableFuture.supplyAsync(() -> { - if (stream == null) { - return JavaUtils.completeExceptionally( - new IllegalStateException("DataStream is null")); - } - if (stream.getDataChannel().isOpen()) { - return JavaUtils.completeExceptionally( - new IllegalStateException( - "DataStream: " + stream + " is not closed properly")); - } else { - return CompletableFuture.completedFuture(null); + if (stream == null) { + return JavaUtils.completeExceptionally(new IllegalStateException( + "DataStream is null")); + } + final DataChannel dataChannel = stream.getDataChannel(); + if (dataChannel.isOpen()) { + return JavaUtils.completeExceptionally(new IllegalStateException( + "DataStream: " + stream + " is not closed properly")); + } + + final CompletableFuture<ContainerCommandResponseProto> f; + if (dataChannel instanceof KeyValueStreamDataChannel) { + f = CompletableFuture.completedFuture(null); + } else { + return JavaUtils.completeExceptionally(new IllegalStateException( + "Unexpected DataChannel " + dataChannel.getClass())); + } + return f.whenComplete((res, e) -> { + if (LOG.isDebugEnabled()) { + LOG.debug("PutBlock {} Term: {} Index: {}", + res.getResult(), entry.getTerm(), entry.getIndex()); } - }, executor); + }); } private ExecutorService getChunkExecutor(WriteChunkRequestProto req) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index e94f755234..501e18f72d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -182,13 +182,21 @@ public class KeyValueHandler extends Handler { @Override public StateMachine.DataChannel getStreamDataChannel( - Container container, ContainerCommandRequestProto msg) - throws StorageContainerException { + Container container, ContainerCommandRequestProto msg) + throws StorageContainerException { KeyValueContainer kvContainer = (KeyValueContainer) container; checkContainerOpen(kvContainer); - BlockID blockID = BlockID.getFromProtobuf(msg.getWriteChunk().getBlockID()); - return chunkManager.getStreamDataChannel(kvContainer, - blockID, metrics); + + if (msg.hasWriteChunk()) { + BlockID blockID = + BlockID.getFromProtobuf(msg.getWriteChunk().getBlockID()); + + return chunkManager.getStreamDataChannel(kvContainer, + blockID, metrics); + } else { + throw new StorageContainerException("Malformed request.", + ContainerProtos.Result.IO_EXCEPTION); + } } @Override @@ -271,10 +279,14 @@ public class KeyValueHandler extends Handler { ContainerCommandResponseProto handleStreamInit( ContainerCommandRequestProto request, KeyValueContainer kvContainer, DispatcherContext dispatcherContext) { - if (!request.hasWriteChunk()) { + final BlockID blockID; + if (request.hasWriteChunk()) { + WriteChunkRequestProto writeChunk = request.getWriteChunk(); + blockID = BlockID.getFromProtobuf(writeChunk.getBlockID()); + } else { if (LOG.isDebugEnabled()) { - LOG.debug("Malformed Write Chunk request. trace ID: {}", - request.getTraceID()); + LOG.debug("Malformed {} request. trace ID: {}", + request.getCmdType(), request.getTraceID()); } return malformedRequest(request); } @@ -282,13 +294,8 @@ public class KeyValueHandler extends Handler { String path = null; try { checkContainerOpen(kvContainer); - - WriteChunkRequestProto writeChunk = request.getWriteChunk(); - BlockID blockID = BlockID.getFromProtobuf(writeChunk.getBlockID()); - path = chunkManager .streamInit(kvContainer, blockID); - } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java index 14ead4ea86..66723031f0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java @@ -28,7 +28,7 @@ import java.io.File; /** * This class is used to get the DataChannel for streaming. */ -class KeyValueStreamDataChannel extends StreamDataChannelBase { +public class KeyValueStreamDataChannel extends StreamDataChannelBase { KeyValueStreamDataChannel(File file, ContainerData containerData, ContainerMetrics metrics) throws StorageContainerException { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java index 00cda7844a..e51242cc10 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java @@ -18,10 +18,8 @@ */ package org.apache.hadoop.ozone.client.io; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.client.ReplicationConfig; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; @@ -88,30 +86,6 @@ public class BlockDataStreamOutputEntryPool { this.bufferList = new ArrayList<>(); } - /** - * A constructor for testing purpose only. - * - * @see KeyDataStreamOutput#KeyDataStreamOutput() - */ - @VisibleForTesting - BlockDataStreamOutputEntryPool() { - streamEntries = new ArrayList<>(); - omClient = null; - keyArgs = null; - xceiverClientFactory = null; - config = - new OzoneConfiguration().getObject(OzoneClientConfig.class); - config.setStreamBufferSize(0); - config.setStreamBufferMaxSize(0); - config.setStreamBufferFlushSize(0); - config.setStreamBufferFlushDelay(false); - requestID = null; - int chunkSize = 0; - currentStreamIndex = 0; - openID = -1; - excludeList = new ExcludeList(); - } - /** * When a key is opened, it is possible that there are some blocks already * allocated to it for this open session. In this case, to make use of these diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java index 2540e42e24..dc5c3a016d 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java @@ -28,31 +28,22 @@ import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; -import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; -import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.hdds.scm.storage.AbstractDataStreamOutput; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; -import org.apache.ratis.protocol.exceptions.AlreadyClosedException; -import org.apache.ratis.protocol.exceptions.RaftRetryFailureException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.io.InterruptedIOException; import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; -import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collectors; /** * Maintaining a list of BlockInputStream. Write based on offset. @@ -63,7 +54,7 @@ import java.util.stream.Collectors; * * TODO : currently not support multi-thread access. */ -public class KeyDataStreamOutput implements ByteBufferStreamOutput { +public class KeyDataStreamOutput extends AbstractDataStreamOutput { private OzoneClientConfig config; @@ -79,34 +70,16 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput { private boolean closed; private FileEncryptionInfo feInfo; - private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap; - private int retryCount; + // how much of data is actually written yet to underlying stream private long offset; // how much data has been ingested into the stream private long writeOffset; - // whether an exception is encountered while write and whole write could - // not succeed - private boolean isException; + private final BlockDataStreamOutputEntryPool blockDataStreamOutputEntryPool; private long clientID; - /** - * A constructor for testing purpose only. - */ - @VisibleForTesting - public KeyDataStreamOutput() { - closed = false; - this.retryPolicyMap = HddsClientUtils.getExceptionList() - .stream() - .collect(Collectors.toMap(Function.identity(), - e -> RetryPolicies.TRY_ONCE_THEN_FAIL)); - retryCount = 0; - offset = 0; - blockDataStreamOutputEntryPool = new BlockDataStreamOutputEntryPool(); - } - @VisibleForTesting public List<BlockDataStreamOutputEntry> getStreamEntries() { return blockDataStreamOutputEntryPool.getStreamEntries(); @@ -122,11 +95,6 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput { return blockDataStreamOutputEntryPool.getLocationInfoList(); } - @VisibleForTesting - public int getRetryCount() { - return retryCount; - } - @VisibleForTesting public long getClientID() { return clientID; @@ -142,6 +110,8 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput { String uploadID, int partNumber, boolean isMultipart, boolean unsafeByteBufferConversion ) { + super(HddsClientUtils.getRetryPolicyByException( + config.getMaxRetryCount(), config.getRetryInterval())); this.config = config; OmKeyInfo info = handler.getKeyInfo(); blockDataStreamOutputEntryPool = @@ -158,10 +128,6 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput { // Retrieve the file encryption key info, null if file is not in // encrypted bucket. this.feInfo = info.getFileEncryptionInfo(); - this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException( - config.getMaxRetryCount(), config.getRetryInterval()); - this.retryCount = 0; - this.isException = false; this.writeOffset = 0; this.clientID = handler.getId(); } @@ -322,9 +288,10 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput { if (bufferedDataLen > 0) { // If the data is still cached in the underlying stream, we need to // allocate new block and write this data in the datanode. - handleRetry(exception, bufferedDataLen); + handleRetry(exception); + handleWrite(null, 0, bufferedDataLen, true); // reset the retryCount after handling the exception - retryCount = 0; + resetRetryCount(); } } @@ -333,74 +300,6 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput { closed = true; } - private void handleRetry(IOException exception, long len) throws IOException { - RetryPolicy retryPolicy = retryPolicyMap - .get(HddsClientUtils.checkForException(exception).getClass()); - if (retryPolicy == null) { - retryPolicy = retryPolicyMap.get(Exception.class); - } - RetryPolicy.RetryAction action = null; - try { - action = retryPolicy.shouldRetry(exception, retryCount, 0, true); - } catch (Exception e) { - setExceptionAndThrow(new IOException(e)); - } - if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { - String msg = ""; - if (action.reason != null) { - msg = "Retry request failed. " + action.reason; - LOG.error(msg, exception); - } - setExceptionAndThrow(new IOException(msg, exception)); - } - - // Throw the exception if the thread is interrupted - if (Thread.currentThread().isInterrupted()) { - LOG.warn("Interrupted while trying for retry"); - setExceptionAndThrow(exception); - } - Preconditions.checkArgument( - action.action == RetryPolicy.RetryAction.RetryDecision.RETRY); - if (action.delayMillis > 0) { - try { - Thread.sleep(action.delayMillis); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - IOException ioe = (IOException) new InterruptedIOException( - "Interrupted: action=" + action + ", retry policy=" + retryPolicy) - .initCause(e); - setExceptionAndThrow(ioe); - } - } - retryCount++; - if (LOG.isTraceEnabled()) { - LOG.trace("Retrying Write request. Already tried {} time(s); " + - "retry policy is {} ", retryCount, retryPolicy); - } - handleWrite(null, 0, len, true); - } - - private void setExceptionAndThrow(IOException ioe) throws IOException { - isException = true; - throw ioe; - } - - /** - * Checks if the provided exception signifies retry failure in ratis client. - * In case of retry failure, ratis client throws RaftRetryFailureException - * and all succeeding operations are failed with AlreadyClosedException. - */ - private boolean checkForRetryFailure(Throwable t) { - return t instanceof RaftRetryFailureException - || t instanceof AlreadyClosedException; - } - - // Every container specific exception from datatnode will be seen as - // StorageContainerException - private boolean checkIfContainerToExclude(Throwable t) { - return t instanceof StorageContainerException; - } - @Override public void flush() throws IOException { checkNotClosed(); @@ -485,7 +384,7 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput { closed = true; try { handleFlushOrClose(StreamAction.CLOSE); - if (!isException) { + if (!isException()) { Preconditions.checkArgument(writeOffset == offset); } blockDataStreamOutputEntryPool.commitKey(offset); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java index 6225e25268..65f7348740 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java @@ -161,7 +161,7 @@ public class TestBlockDataStreamOutput { private void testWrite(int dataLength) throws Exception { String keyName = getKeyName(); OzoneDataStreamOutput key = createKey( - keyName, ReplicationType.RATIS, 0); + keyName, ReplicationType.RATIS, dataLength); byte[] data = ContainerTestHelper.getFixedLengthString(keyString, dataLength) .getBytes(UTF_8); @@ -174,7 +174,7 @@ public class TestBlockDataStreamOutput { private void testWriteWithFailure(int dataLength) throws Exception { String keyName = getKeyName(); OzoneDataStreamOutput key = createKey( - keyName, ReplicationType.RATIS, 0); + keyName, ReplicationType.RATIS, dataLength); byte[] data = ContainerTestHelper.getFixedLengthString(keyString, dataLength) .getBytes(UTF_8); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java index f4c756bccd..ad9eca6af7 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java @@ -17,7 +17,7 @@ package org.apache.hadoop.ozone.client.rpc; - +import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -46,10 +46,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.time.Duration; import java.util.List; -import java.util.UUID; import java.util.concurrent.TimeUnit; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL; @@ -74,6 +72,11 @@ public class TestContainerStateMachineStream { private String volumeName; private String bucketName; + private static final int CHUNK_SIZE = 100; + private static final int FLUSH_SIZE = 2 * CHUNK_SIZE; + private static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE; + private static final int BLOCK_SIZE = 2 * MAX_FLUSH_SIZE; + /** * Create a MiniDFSCluster for testing. * @@ -118,8 +121,15 @@ public class TestContainerStateMachineStream { conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1); conf.setQuietMode(false); cluster = - MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(200) + MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(3) + .setHbInterval(200) .setDataStreamMinPacketSize(1024) + .setBlockSize(BLOCK_SIZE) + .setChunkSize(CHUNK_SIZE) + .setStreamBufferFlushSize(FLUSH_SIZE) + .setStreamBufferMaxSize(MAX_FLUSH_SIZE) + .setStreamBufferSizeUnit(StorageUnit.BYTES) .build(); cluster.waitForClusterToBeReady(); cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.ONE, 60000); @@ -146,20 +156,14 @@ public class TestContainerStateMachineStream { @Test public void testContainerStateMachineForStreaming() throws Exception { - long size = 1024 * 8; + long size = CHUNK_SIZE + 1; OzoneDataStreamOutput key = TestHelper.createStreamKey( "ozone-stream-test.txt", ReplicationType.RATIS, size, objectStore, volumeName, bucketName); - byte[] data = - ContainerTestHelper - .getFixedLengthString(UUID.randomUUID().toString(), - (int) (size / 2)) - .getBytes(UTF_8); + byte[] data = ContainerTestHelper.generateData((int) size, true); key.write(ByteBuffer.wrap(data)); - key.write(ByteBuffer.wrap(data)); - key.flush(); KeyDataStreamOutput streamOutput = @@ -181,4 +185,35 @@ public class TestContainerStateMachineStream { Assert.assertTrue(bytesUsed == size); } + + @Test + public void testContainerStateMachineForStreamingSmallFile() + throws Exception { + long size = CHUNK_SIZE - 1; + + OzoneDataStreamOutput key = TestHelper.createStreamKey( + "ozone-stream-test-small-file.txt", ReplicationType.RATIS, size, + objectStore, volumeName, bucketName); + + byte[] data = ContainerTestHelper.generateData((int) size, true); + key.write(ByteBuffer.wrap(data)); + key.flush(); + + KeyDataStreamOutput streamOutput = + (KeyDataStreamOutput) key.getByteBufStreamOutput(); + List<OmKeyLocationInfo> locationInfoList = + streamOutput.getLocationInfoList(); + key.close(); + OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0); + HddsDatanodeService dn = TestHelper.getDatanodeService(omKeyLocationInfo, + cluster); + + long bytesUsed = dn.getDatanodeStateMachine() + .getContainer().getContainerSet() + .getContainer(omKeyLocationInfo.getContainerID()). + getContainerData().getBytesUsed(); + + Assert.assertTrue(bytesUsed == size); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
