This is an automated email from the ASF dual-hosted git repository. captainzmc pushed a commit to branch HDDS-4454 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit d72a8a09d573d63e2069851fd621ea78ad10aedd Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Mon Mar 28 20:43:38 2022 +0800 HDDS-6500. [Ozone-Streaming] Buffer the PutBlockRequest at the end of the stream. (#3229) --- .../hdds/scm/storage/BlockDataStreamOutput.java | 48 +++- .../org/apache/hadoop/hdds/ratis/RatisHelper.java | 26 ++ .../hdds/scm/storage/ContainerProtocolCalls.java | 14 +- .../server/ratis/ContainerStateMachine.java | 27 +- .../keyvalue/impl/KeyValueStreamDataChannel.java | 235 ++++++++++++++++ .../keyvalue/impl/StreamDataChannelBase.java | 3 +- .../impl/TestKeyValueStreamDataChannel.java | 313 +++++++++++++++++++++ .../client/rpc/TestBlockDataStreamOutput.java | 16 +- 8 files changed, 657 insertions(+), 25 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index d5b9dd9d81..d19f2aea13 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage; @@ -83,6 +84,9 @@ import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlock public class BlockDataStreamOutput implements ByteBufferStreamOutput { public static final Logger LOG = LoggerFactory.getLogger(BlockDataStreamOutput.class); + + public static final int PUT_BLOCK_REQUEST_LENGTH_MAX = 1 << 20; // 1MB + public static final String EXCEPTION_MSG = "Unexpected Storage Container Exception: "; private static final CompletableFuture[] EMPTY_FUTURE_ARRAY = {}; @@ -406,12 +410,26 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { byteBufferList = null; } waitFuturesComplete(); + final BlockData blockData = containerBlockData.build(); if (close) { - dataStreamCloseReply = out.closeAsync(); + final ContainerCommandRequestProto putBlockRequest + = ContainerProtocolCalls.getPutBlockRequest( + xceiverClient.getPipeline(), blockData, true, token); + dataStreamCloseReply = executePutBlockClose(putBlockRequest, + PUT_BLOCK_REQUEST_LENGTH_MAX, out); + dataStreamCloseReply.whenComplete((reply, e) -> { + if (e != null || reply == null || !reply.isSuccess()) { + LOG.warn("Failed executePutBlockClose, reply=" + reply, e); + try { + executePutBlock(true, false); + } catch (IOException ex) { + throw new CompletionException(ex); + } + } + }); } try { - BlockData blockData = containerBlockData.build(); XceiverClientReply asyncReply = putBlockAsync(xceiverClient, blockData, close, token); final CompletableFuture<ContainerCommandResponseProto> flushFuture @@ -459,6 +477,30 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { } } + public static CompletableFuture<DataStreamReply> executePutBlockClose( + ContainerCommandRequestProto putBlockRequest, int max, + DataStreamOutput out) { + final ByteBuffer putBlock = ContainerCommandRequestMessage.toMessage( + putBlockRequest, null).getContent().asReadOnlyByteBuffer(); + final ByteBuffer protoLength = getProtoLength(putBlock, max); + RatisHelper.debug(putBlock, "putBlock", LOG); + out.writeAsync(putBlock); + RatisHelper.debug(protoLength, "protoLength", LOG); + return out.writeAsync(protoLength, StandardWriteOption.CLOSE); + } + + public static ByteBuffer getProtoLength(ByteBuffer putBlock, int max) { + final int protoLength = putBlock.remaining(); + Preconditions.checkState(protoLength <= max, + "protoLength== %s > max = %s", protoLength, max); + final ByteBuffer buffer = ByteBuffer.allocate(4); + buffer.putInt(protoLength); + buffer.flip(); + LOG.debug("protoLength = {}", protoLength); + Preconditions.checkState(buffer.remaining() == 4); + return buffer.asReadOnlyBuffer(); + } + @Override public void flush() throws IOException { if (xceiverClientFactory != null && xceiverClient != null @@ -547,7 +589,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { } - private void setIoException(Exception e) { + private void setIoException(Throwable e) { IOException ioe = getIoException(); if (ioe == null) { IOException exception = new IOException(EXCEPTION_MSG + e.toString(), e); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java index e431c67df7..5b7ecb0c6b 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.ratis; import java.io.IOException; +import java.nio.ByteBuffer; import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Collection; @@ -60,6 +61,7 @@ import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -457,4 +459,28 @@ public final class RatisHelper { return builder.build(); } + + public static void debug(ByteBuffer buffer, String name, Logger log) { + if (!log.isDebugEnabled()) { + return; + } + buffer = buffer.duplicate(); + final StringBuilder builder = new StringBuilder(); + for (int i = 1; buffer.remaining() > 0; i++) { + builder.append(buffer.get()).append(i % 20 == 0 ? "\n " : ", "); + } + log.debug("{}: {}\n {}", name, buffer, builder); + } + + public static void debug(ByteBuf buf, String name, Logger log) { + if (!log.isDebugEnabled()) { + return; + } + buf = buf.duplicate(); + final StringBuilder builder = new StringBuilder(); + for (int i = 1; buf.readableBytes() > 0; i++) { + builder.append(buf.readByte()).append(i % 20 == 0 ? "\n " : ", "); + } + log.debug("{}: {}\n {}", name, buf, builder); + } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 16bef69712..e5dc4116f4 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.common.helpers.BlockNotCommittedException; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; @@ -233,11 +234,19 @@ public final class ContainerProtocolCalls { XceiverClientSpi xceiverClient, BlockData containerBlockData, boolean eof, Token<? extends TokenIdentifier> token) throws IOException, InterruptedException, ExecutionException { + final ContainerCommandRequestProto request = getPutBlockRequest( + xceiverClient.getPipeline(), containerBlockData, eof, token); + return xceiverClient.sendCommandAsync(request); + } + + public static ContainerCommandRequestProto getPutBlockRequest( + Pipeline pipeline, BlockData containerBlockData, boolean eof, + Token<? extends TokenIdentifier> token) throws IOException { PutBlockRequestProto.Builder createBlockRequest = PutBlockRequestProto.newBuilder() .setBlockData(containerBlockData) .setEof(eof); - String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); + final String id = pipeline.getFirstNode().getUuidString(); ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto.newBuilder().setCmdType(Type.PutBlock) .setContainerID(containerBlockData.getBlockID().getContainerID()) @@ -246,8 +255,7 @@ public final class ContainerProtocolCalls { if (token != null) { builder.setEncodedToken(token.encodeToUrlString()); } - ContainerCommandRequestProto request = builder.build(); - return xceiverClient.sendCommandAsync(request); + return builder.build(); } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 916d3e7f5b..b9707e5af2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -427,6 +427,20 @@ public class ContainerStateMachine extends BaseStateMachine { return dispatchCommand(requestProto, context); } + private CompletableFuture<ContainerCommandResponseProto> runCommandAsync( + ContainerCommandRequestProto requestProto, LogEntryProto entry) { + return CompletableFuture.supplyAsync(() -> { + final DispatcherContext context = new DispatcherContext.Builder() + .setTerm(entry.getTerm()) + .setLogIndex(entry.getIndex()) + .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA) + .setContainer2BCSIDMap(container2BCSIDMap) + .build(); + + return runCommand(requestProto, context); + }, executor); + } + private CompletableFuture<Message> handleWriteChunk( ContainerCommandRequestProto requestProto, long entryIndex, long term, long startTime) { @@ -560,19 +574,16 @@ public class ContainerStateMachine extends BaseStateMachine { "DataStream: " + stream + " is not closed properly")); } - final CompletableFuture<ContainerCommandResponseProto> f; + final ContainerCommandRequestProto request; if (dataChannel instanceof KeyValueStreamDataChannel) { - f = CompletableFuture.completedFuture(null); + request = ((KeyValueStreamDataChannel) dataChannel).getPutBlockRequest(); } else { return JavaUtils.completeExceptionally(new IllegalStateException( "Unexpected DataChannel " + dataChannel.getClass())); } - return f.whenComplete((res, e) -> { - if (LOG.isDebugEnabled()) { - LOG.debug("PutBlock {} Term: {} Index: {}", - res.getResult(), entry.getTerm(), entry.getIndex()); - } - }); + return runCommandAsync(request, entry).whenComplete( + (res, e) -> LOG.debug("link {}, entry: {}, request: {}", + res.getResult(), entry, request)); } private ExecutorService getChunkExecutor(WriteChunkRequestProto req) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java index 66723031f0..99dc40f5d0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java @@ -18,17 +18,131 @@ package org.apache.hadoop.ozone.container.keyvalue.impl; +import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage; +import org.apache.hadoop.hdds.ratis.RatisHelper; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf; +import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled; +import org.apache.ratis.util.ReferenceCountedObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Deque; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; /** * This class is used to get the DataChannel for streaming. */ public class KeyValueStreamDataChannel extends StreamDataChannelBase { + public static final Logger LOG = + LoggerFactory.getLogger(KeyValueStreamDataChannel.class); + + /** + * Keep the last {@link Buffers#max} bytes in the buffer + * in order to create putBlockRequest + * at {@link #closeBuffers(Buffers, WriteMethod)}}. + */ + static class Buffers { + private final Deque<ReferenceCountedObject<ByteBuffer>> deque + = new LinkedList<>(); + private final int max; + private int length; + + Buffers(int max) { + this.max = max; + } + + private boolean isExtra(int n) { + return length - n >= max; + } + + private boolean hasExtraBuffer() { + return Optional.ofNullable(deque.peek()) + .map(ReferenceCountedObject::get) + .filter(b -> isExtra(b.remaining())) + .isPresent(); + } + + /** + * @return extra buffers which are safe to be written. + */ + Iterable<ReferenceCountedObject<ByteBuffer>> offer( + ReferenceCountedObject<ByteBuffer> ref) { + final ByteBuffer buffer = ref.retain(); + LOG.debug("offer {}", buffer); + final boolean offered = deque.offer(ref); + Preconditions.checkState(offered, "Failed to offer"); + length += buffer.remaining(); + + return () -> new Iterator<ReferenceCountedObject<ByteBuffer>>() { + @Override + public boolean hasNext() { + return hasExtraBuffer(); + } + + @Override + public ReferenceCountedObject<ByteBuffer> next() { + final ReferenceCountedObject<ByteBuffer> polled = poll(); + length -= polled.get().remaining(); + Preconditions.checkState(length >= max); + return polled; + } + }; + } + + ReferenceCountedObject<ByteBuffer> poll() { + final ReferenceCountedObject<ByteBuffer> polled + = Objects.requireNonNull(deque.poll()); + RatisHelper.debug(polled.get(), "polled", LOG); + return polled; + } + + ReferenceCountedObject<ByteBuf> pollAll() { + Preconditions.checkState(!deque.isEmpty(), "The deque is empty"); + final ByteBuffer[] array = new ByteBuffer[deque.size()]; + final List<ReferenceCountedObject<ByteBuffer>> refs + = new ArrayList<>(deque.size()); + for (int i = 0; i < array.length; i++) { + final ReferenceCountedObject<ByteBuffer> ref = poll(); + refs.add(ref); + array[i] = ref.get(); + } + final ByteBuf buf = Unpooled.wrappedBuffer(array).asReadOnly(); + return ReferenceCountedObject.wrap(buf, () -> { + }, () -> { + buf.release(); + refs.forEach(ReferenceCountedObject::release); + }); + } + } + + interface WriteMethod { + int applyAsInt(ByteBuffer src) throws IOException; + } + + private final Buffers buffers = new Buffers( + BlockDataStreamOutput.PUT_BLOCK_REQUEST_LENGTH_MAX); + private final AtomicReference<ContainerCommandRequestProto> putBlockRequest + = new AtomicReference<>(); + private final AtomicBoolean closed = new AtomicBoolean(); + KeyValueStreamDataChannel(File file, ContainerData containerData, ContainerMetrics metrics) throws StorageContainerException { @@ -39,4 +153,125 @@ public class KeyValueStreamDataChannel extends StreamDataChannelBase { ContainerProtos.Type getType() { return ContainerProtos.Type.StreamWrite; } + + @Override + public int write(ReferenceCountedObject<ByteBuffer> referenceCounted) + throws IOException { + assertOpen(); + return writeBuffers(referenceCounted, buffers, super::writeFileChannel); + } + + static int writeBuffers(ReferenceCountedObject<ByteBuffer> src, + Buffers buffers, WriteMethod writeMethod) + throws IOException { + for (ReferenceCountedObject<ByteBuffer> b : buffers.offer(src)) { + try { + writeFully(b.get(), writeMethod); + } finally { + b.release(); + } + } + return src.get().remaining(); + } + + private static void writeFully(ByteBuffer b, WriteMethod writeMethod) + throws IOException { + for (; b.remaining() > 0;) { + final int written = writeMethod.applyAsInt(b); + if (written <= 0) { + throw new IOException("Unable to write"); + } + } + } + + public ContainerCommandRequestProto getPutBlockRequest() { + return Objects.requireNonNull(putBlockRequest.get(), + () -> "putBlockRequest == null, " + this); + } + + void assertOpen() throws IOException { + if (closed.get()) { + throw new IOException("Already closed: " + this); + } + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + putBlockRequest.set(closeBuffers(buffers, super::writeFileChannel)); + super.close(); + } + } + + static ContainerCommandRequestProto closeBuffers( + Buffers buffers, WriteMethod writeMethod) throws IOException { + final ReferenceCountedObject<ByteBuf> ref = buffers.pollAll(); + final ByteBuf buf = ref.retain(); + final ContainerCommandRequestProto putBlockRequest; + try { + putBlockRequest = readPutBlockRequest(buf); + // write the remaining data + writeFully(buf.nioBuffer(), writeMethod); + } finally { + ref.release(); + } + return putBlockRequest; + } + + private static int readProtoLength(ByteBuf b, int lengthIndex) { + final int readerIndex = b.readerIndex(); + LOG.debug("{}, lengthIndex = {}, readerIndex = {}", + b, lengthIndex, readerIndex); + if (lengthIndex > readerIndex) { + b.readerIndex(lengthIndex); + } else { + Preconditions.checkState(lengthIndex == readerIndex); + } + RatisHelper.debug(b, "readProtoLength", LOG); + return b.nioBuffer().getInt(); + } + + static ContainerCommandRequestProto readPutBlockRequest(ByteBuf b) + throws IOException { + // readerIndex protoIndex lengthIndex readerIndex+readableBytes + // V V V V + // format: |--- data ---|--- proto ---|--- proto length (4 bytes) ---| + final int readerIndex = b.readerIndex(); + final int lengthIndex = readerIndex + b.readableBytes() - 4; + final int protoLength = readProtoLength(b.duplicate(), lengthIndex); + final int protoIndex = lengthIndex - protoLength; + + final ContainerCommandRequestProto proto; + try { + proto = readPutBlockRequest(b.slice(protoIndex, protoLength).nioBuffer()); + } catch (Throwable t) { + RatisHelper.debug(b, "catch", LOG); + throw new IOException("Failed to readPutBlockRequest from " + b + + ": readerIndex=" + readerIndex + + ", protoIndex=" + protoIndex + + ", protoLength=" + protoLength + + ", lengthIndex=" + lengthIndex, t); + } + + // set index for reading data + b.writerIndex(protoIndex); + + return proto; + } + + private static ContainerCommandRequestProto readPutBlockRequest(ByteBuffer b) + throws IOException { + RatisHelper.debug(b, "readPutBlockRequest", LOG); + final ByteString byteString = ByteString.copyFrom(b); + + final ContainerCommandRequestProto request = + ContainerCommandRequestMessage.toProto(byteString, null); + + if (!request.hasPutBlock()) { + throw new StorageContainerException( + "Malformed PutBlock request. trace ID: " + request.getTraceID(), + ContainerProtos.Result.MALFORMED_REQUEST); + } + return request; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java index b31e2ccbf4..9829033248 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java @@ -79,8 +79,7 @@ abstract class StreamDataChannelBase implements StateMachine.DataChannel { randomAccessFile.close(); } - @Override - public int write(ByteBuffer src) throws IOException { + final int writeFileChannel(ByteBuffer src) throws IOException { final int writeBytes = getChannel().write(src); metrics.incContainerBytesStats(getType(), writeBytes); containerData.updateWriteStats(writeBytes, false); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java new file mode 100644 index 0000000000..d252b1cb1b --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.keyvalue.impl; + +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; +import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage; +import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.Buffers; +import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.WriteMethod; +import org.apache.ratis.client.api.DataStreamOutput; +import org.apache.ratis.io.FilePositionCount; +import org.apache.ratis.io.StandardWriteOption; +import org.apache.ratis.io.WriteOption; +import org.apache.ratis.proto.RaftProtos.CommitInfoProto; +import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.DataStreamReply; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf; +import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled; +import org.apache.ratis.util.ReferenceCountedObject; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput.PUT_BLOCK_REQUEST_LENGTH_MAX; +import static org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput.executePutBlockClose; +import static org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput.getProtoLength; +import static org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.closeBuffers; +import static org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.readPutBlockRequest; +import static org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.writeBuffers; + +/** For testing {@link KeyValueStreamDataChannel}. */ +public class TestKeyValueStreamDataChannel { + public static final Logger LOG = + LoggerFactory.getLogger(TestKeyValueStreamDataChannel.class); + + static final ContainerCommandRequestProto PUT_BLOCK_PROTO + = ContainerCommandRequestProto.newBuilder() + .setCmdType(Type.PutBlock) + .setPutBlock(PutBlockRequestProto.newBuilder().setBlockData( + BlockData.newBuilder().setBlockID(DatanodeBlockID.newBuilder() + .setContainerID(222).setLocalID(333).build()).build())) + .setDatanodeUuid("datanodeId") + .setContainerID(111L) + .build(); + static final int PUT_BLOCK_PROTO_SIZE = PUT_BLOCK_PROTO.toByteString().size(); + static { + LOG.info("PUT_BLOCK_PROTO_SIZE = {}", PUT_BLOCK_PROTO_SIZE); + } + + @Test + public void testSerialization() throws Exception { + final int max = PUT_BLOCK_REQUEST_LENGTH_MAX; + final ByteBuffer putBlockBuf = ContainerCommandRequestMessage.toMessage( + PUT_BLOCK_PROTO, null).getContent().asReadOnlyByteBuffer(); + final ByteBuffer protoLengthBuf = getProtoLength(putBlockBuf, max); + + // random data size + final int dataSize = ThreadLocalRandom.current().nextInt(1000) + 100; + final byte[] data = new byte[dataSize]; + + //serialize + final ByteBuf buf = Unpooled.buffer(max); + buf.writeBytes(data); + buf.writeBytes(putBlockBuf); + buf.writeBytes(protoLengthBuf); + + final ContainerCommandRequestProto proto = readPutBlockRequest(buf); + Assert.assertEquals(PUT_BLOCK_PROTO, proto); + } + + @Test + public void testBuffers() throws Exception { + final ExecutorService executor = Executors.newFixedThreadPool(32); + final List<CompletableFuture<String>> futures = new ArrayList<>(); + + final int min = PUT_BLOCK_PROTO_SIZE + 4; + final int[] maxValues = {min, 2 * min, 10 * min}; + final int[] dataSizes = {0, 10, 100, 10_000}; + for (int max : maxValues) { + for (int dataSize : dataSizes) { + futures.add(CompletableFuture.supplyAsync( + () -> runTestBuffers(dataSize, max), executor)); + } + } + + for (CompletableFuture<String> f : futures) { + f.get(); + } + } + + static String runTestBuffers(int dataSize, int max) { + final int seed = ThreadLocalRandom.current().nextInt(); + final String name = String.format("[dataSize=%d,max=%d,seed=%H]", + dataSize, max, seed); + LOG.info(name); + try { + runTestBuffers(dataSize, max, seed, name); + } catch (Throwable t) { + throw new CompletionException("Failed " + name, t); + } + return name; + } + + static void runTestBuffers(int dataSize, int max, int seed, String name) + throws Exception { + Assert.assertTrue(max >= PUT_BLOCK_PROTO_SIZE); + + // random data + final byte[] data = new byte[dataSize]; + final Random random = new Random(seed); + random.nextBytes(data); + + // write output + final Buffers buffers = new Buffers(max); + final Output out = new Output(buffers); + for (int offset = 0; offset < dataSize;) { + final int randomLength = random.nextInt(4 * max); + final int length = Math.min(randomLength, dataSize - offset); + LOG.info("{}: offset = {}, length = {}", name, offset, length); + final ByteBuffer b = ByteBuffer.wrap(data, offset, length); + final DataStreamReply writeReply = out.writeAsync(b).get(); + assertReply(writeReply, length, null); + offset += length; + } + + // close + final DataStreamReply closeReply = executePutBlockClose( + PUT_BLOCK_PROTO, max, out).get(); + assertReply(closeReply, 0, PUT_BLOCK_PROTO); + + // check output + final ByteBuf outBuf = out.getOutBuf(); + LOG.info("outBuf = {}", outBuf); + Assert.assertEquals(dataSize, outBuf.readableBytes()); + for (int i = 0; i < dataSize; i++) { + Assert.assertEquals(data[i], outBuf.readByte()); + } + outBuf.release(); + } + + static void assertReply(DataStreamReply reply, int byteWritten, + ContainerCommandRequestProto proto) { + Assert.assertTrue(reply.isSuccess()); + Assert.assertEquals(byteWritten, reply.getBytesWritten()); + Assert.assertEquals(proto, ((Reply)reply).getPutBlockRequest()); + } + + static class Output implements DataStreamOutput { + private final Buffers buffers; + private final ByteBuf outBuf = Unpooled.buffer(); + private final WriteMethod writeMethod = src -> { + final int remaining = src.remaining(); + outBuf.writeBytes(src); + return remaining; + }; + + Output(Buffers buffers) { + this.buffers = buffers; + } + + ByteBuf getOutBuf() { + return outBuf; + } + + @Override + public CompletableFuture<DataStreamReply> writeAsync( + ByteBuffer src, WriteOption... writeOptions) { + final int written; + try { + written = writeBuffers( + ReferenceCountedObject.wrap(src, () -> { }, () -> { }), + buffers, writeMethod); + } catch (IOException e) { + return completeExceptionally(e); + } + if (WriteOption.containsOption(writeOptions, StandardWriteOption.CLOSE)) { + return closeAsync(); + } + return CompletableFuture.completedFuture( + new Reply(true, written)); + } + + @Override + public CompletableFuture<DataStreamReply> closeAsync() { + final ContainerCommandRequestProto putBlockRequest; + try { + putBlockRequest = closeBuffers(buffers, writeMethod); + } catch (IOException e) { + return completeExceptionally(e); + } + return CompletableFuture.completedFuture( + new Reply(true, 0, putBlockRequest)); + } + + @Override + public CompletableFuture<DataStreamReply> writeAsync( + FilePositionCount filePositionCount, WriteOption... writeOptions) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<RaftClientReply> getRaftClientReplyFuture() { + throw new UnsupportedOperationException(); + } + + @Override + public WritableByteChannel getWritableByteChannel() { + throw new UnsupportedOperationException(); + } + } + + static class Reply implements DataStreamReply { + private final boolean success; + private final long bytesWritten; + private final ContainerCommandRequestProto putBlockRequest; + + Reply(boolean success, long bytesWritten) { + this(success, bytesWritten, null); + } + + Reply(boolean success, long bytesWritten, + ContainerCommandRequestProto putBlockRequest) { + this.success = success; + this.bytesWritten = bytesWritten; + this.putBlockRequest = putBlockRequest; + } + + ContainerCommandRequestProto getPutBlockRequest() { + return putBlockRequest; + } + + @Override + public boolean isSuccess() { + return success; + } + + @Override + public long getBytesWritten() { + return bytesWritten; + } + + @Override + public Collection<CommitInfoProto> getCommitInfos() { + throw new UnsupportedOperationException(); + } + + @Override + public ClientId getClientId() { + throw new UnsupportedOperationException(); + } + + @Override + public DataStreamPacketHeaderProto.Type getType() { + throw new UnsupportedOperationException(); + } + + @Override + public long getStreamId() { + throw new UnsupportedOperationException(); + } + + @Override + public long getStreamOffset() { + throw new UnsupportedOperationException(); + } + + @Override + public long getDataLength() { + throw new UnsupportedOperationException(); + } + } + + static CompletableFuture<DataStreamReply> completeExceptionally(Throwable t) { + final CompletableFuture<DataStreamReply> f = new CompletableFuture<>(); + f.completeExceptionally(t); + return f; + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java index 65f7348740..c8a0115a80 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java @@ -120,7 +120,7 @@ public class TestBlockDataStreamOutput { objectStore.getVolume(volumeName).createBucket(bucketName); } - private String getKeyName() { + static String getKeyName() { return UUID.randomUUID().toString(); } @@ -158,13 +158,11 @@ public class TestBlockDataStreamOutput { testWriteWithFailure(blockSize + 50); } - private void testWrite(int dataLength) throws Exception { + static void testWrite(int dataLength) throws Exception { String keyName = getKeyName(); OzoneDataStreamOutput key = createKey( keyName, ReplicationType.RATIS, dataLength); - byte[] data = - ContainerTestHelper.getFixedLengthString(keyString, dataLength) - .getBytes(UTF_8); + final byte[] data = ContainerTestHelper.generateData(dataLength, false); key.write(ByteBuffer.wrap(data)); // now close the stream, It will update the key length. key.close(); @@ -221,14 +219,14 @@ public class TestBlockDataStreamOutput { } - private OzoneDataStreamOutput createKey(String keyName, ReplicationType type, + static OzoneDataStreamOutput createKey(String keyName, ReplicationType type, long size) throws Exception { return TestHelper.createStreamKey( keyName, type, size, objectStore, volumeName, bucketName); } - private void validateData(String keyName, byte[] data) throws Exception { - TestHelper - .validateData(keyName, data, objectStore, volumeName, bucketName); + static void validateData(String keyName, byte[] data) throws Exception { + TestHelper.validateData( + keyName, data, objectStore, volumeName, bucketName); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
