This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-7593 by this push:
new 91e5d2eb28 HDDS-9130. [hsync] Combine WriteData and PutBlock requests
into one (#5980)
91e5d2eb28 is described below
commit 91e5d2eb289fcbe8ab1e7c4a40d4f0c839598597
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Mon Apr 1 01:20:15 2024 -0700
HDDS-9130. [hsync] Combine WriteData and PutBlock requests into one (#5980)
---
.../apache/hadoop/hdds/scm/OzoneClientConfig.java | 16 ++
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 168 +++++++++++++++------
.../hdds/scm/storage/ECBlockOutputStream.java | 5 +-
.../org/apache/hadoop/hdds/DatanodeVersion.java | 2 +
.../ContainerCommandResponseBuilders.java | 23 +++
.../hdds/scm/storage/ContainerProtocolCalls.java | 10 +-
.../server/ratis/ContainerStateMachine.java | 5 +-
.../ozone/container/keyvalue/KeyValueHandler.java | 27 +++-
.../container/keyvalue/impl/BlockManagerImpl.java | 3 +-
.../container/metadata/AbstractDatanodeStore.java | 2 +-
.../src/main/proto/DatanodeClientProtocol.proto | 2 +
.../hadoop/ozone/client/MockXceiverClientSpi.java | 34 +++--
.../java/org/apache/hadoop/fs/ozone/TestHSync.java | 26 ++--
.../org/apache/hadoop/ozone/MiniOzoneCluster.java | 2 +-
.../ozone/client/rpc/TestBlockOutputStream.java | 21 ++-
.../ozone/om/TestOmContainerLocationCache.java | 36 ++++-
16 files changed, 288 insertions(+), 94 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index 4af9009e16..d1992ac931 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -256,6 +256,14 @@ public class OzoneClientConfig {
tags = ConfigTag.CLIENT)
private boolean incrementalChunkList = true;
+ @Config(key = "stream.putblock.piggybacking",
+ defaultValue = "false",
+ type = ConfigType.BOOLEAN,
+ description = "Allow PutBlock to be piggybacked in WriteChunk " +
+ "requests if the chunk is small.",
+ tags = ConfigTag.CLIENT)
+ private boolean enablePutblockPiggybacking = false;
+
@PostConstruct
public void validate() {
Preconditions.checkState(streamBufferSize > 0);
@@ -454,6 +462,14 @@ public class OzoneClientConfig {
return fsDefaultBucketLayout;
}
+ public void setEnablePutblockPiggybacking(boolean
enablePutblockPiggybacking) {
+ this.enablePutblockPiggybacking = enablePutblockPiggybacking;
+ }
+
+ public boolean getEnablePutblockPiggybacking() {
+ return enablePutblockPiggybacking;
+ }
+
public boolean isDatastreamPipelineMode() {
return datastreamPipelineMode;
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 239800746c..f29bf49038 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -55,6 +55,8 @@ import org.apache.hadoop.security.token.TokenIdentifier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+
+import static
org.apache.hadoop.hdds.DatanodeVersion.COMBINED_PUTBLOCK_WRITECHUNK_RPC;
import static
org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
import static
org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.writeChunkAsync;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -140,6 +142,7 @@ public class BlockOutputStream extends OutputStream {
private int replicationIndex;
private Pipeline pipeline;
private final ContainerClientMetrics clientMetrics;
+ private boolean allowPutBlockPiggybacking;
/**
* Creates a new BlockOutputStream.
@@ -211,6 +214,20 @@ public class BlockOutputStream extends OutputStream {
this.clientMetrics = clientMetrics;
this.pipeline = pipeline;
this.streamBufferArgs = streamBufferArgs;
+ this.allowPutBlockPiggybacking = config.getEnablePutblockPiggybacking() &&
+ allDataNodesSupportPiggybacking();
+ }
+
+ private boolean allDataNodesSupportPiggybacking() {
+ // return true only if all DataNodes in the pipeline are on a version
+ // that supports PutBlock piggybacking.
+ for (DatanodeDetails dn : pipeline.getNodes()) {
+ if (dn.getCurrentVersion() <
+ COMBINED_PUTBLOCK_WRITECHUNK_RPC.toProtoValue()) {
+ return false;
+ }
+ }
+ return true;
}
void refreshCurrentBuffer() {
@@ -499,22 +516,8 @@ public class BlockOutputStream extends OutputStream {
}
// if the ioException is not set, putBlock is successful
if (getIoException() == null && !force) {
- BlockID responseBlockID = BlockID.getFromProtobuf(
- e.getPutBlock().getCommittedBlockLength().getBlockID());
- Preconditions.checkState(blockID.get().getContainerBlockID()
- .equals(responseBlockID.getContainerBlockID()));
- // updates the bcsId of the block
- blockID.set(responseBlockID);
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Adding index " + asyncReply.getLogIndex() + " flushLength "
- + flushPos + " numBuffers " + byteBufferList.size()
- + " blockID " + blockID + " bufferPool size" + bufferPool
- .getSize() + " currentBufferIndex " + bufferPool
- .getCurrentBufferIndex());
- }
- // for standalone protocol, logIndex will always be 0.
- updateCommitInfo(asyncReply, byteBufferList);
+ handleSuccessfulPutBlock(e.getPutBlock().getCommittedBlockLength(),
+ asyncReply, flushPos, byteBufferList);
}
return e;
}, responseExecutor).exceptionally(e -> {
@@ -551,7 +554,7 @@ public class BlockOutputStream extends OutputStream {
}
}
- private void writeChunk(ChunkBuffer buffer)
+ private void writeChunkCommon(ChunkBuffer buffer)
throws IOException {
// This data in the buffer will be pushed to datanode and a reference will
// be added to the bufferList. Once putBlock gets executed, this list will
@@ -562,7 +565,18 @@ public class BlockOutputStream extends OutputStream {
bufferList = new ArrayList<>();
}
bufferList.add(buffer);
- writeChunkToContainer(buffer.duplicate(0, buffer.position()));
+ }
+
+ private void writeChunk(ChunkBuffer buffer)
+ throws IOException {
+ writeChunkCommon(buffer);
+ writeChunkToContainer(buffer.duplicate(0, buffer.position()), false);
+ }
+
+ private void writeChunkAndPutBlock(ChunkBuffer buffer)
+ throws IOException {
+ writeChunkCommon(buffer);
+ writeChunkToContainer(buffer.duplicate(0, buffer.position()), true);
}
/**
@@ -594,14 +608,23 @@ public class BlockOutputStream extends OutputStream {
if (totalDataFlushedLength < writtenDataLength) {
refreshCurrentBuffer();
Preconditions.checkArgument(currentBuffer.position() > 0);
- if (currentBuffer.hasRemaining()) {
- writeChunk(currentBuffer);
- }
+
// This can be a partially filled chunk. Since we are flushing the buffer
// here, we just limit this buffer to the current position. So that next
// write will happen in new buffer
- updateFlushLength();
- executePutBlock(close, false);
+ if (currentBuffer.hasRemaining()) {
+ if (allowPutBlockPiggybacking) {
+ updateFlushLength();
+ writeChunkAndPutBlock(currentBuffer);
+ } else {
+ writeChunk(currentBuffer);
+ updateFlushLength();
+ executePutBlock(close, false);
+ }
+ } else {
+ updateFlushLength();
+ executePutBlock(close, false);
+ }
} else if (close) {
// forcing an "empty" putBlock if stream is being closed without new
// data since latest flush - we need to send the "EOF" flag
@@ -713,7 +736,7 @@ public class BlockOutputStream extends OutputStream {
* @return
*/
CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
- ChunkBuffer chunk) throws IOException {
+ ChunkBuffer chunk, boolean putBlockPiggybacking) throws IOException {
int effectiveChunkSize = chunk.remaining();
final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
final ByteString data = chunk.toByteString(
@@ -726,6 +749,8 @@ public class BlockOutputStream extends OutputStream {
.setChecksumData(checksumData.getProtoBufMessage())
.build();
+ long flushPos = totalDataFlushedLength;
+
if (LOG.isDebugEnabled()) {
LOG.debug("Writing chunk {} length {} at offset {}",
chunkInfo.getChunkName(), effectiveChunkSize, offset);
@@ -743,42 +768,93 @@ public class BlockOutputStream extends OutputStream {
+ ", previous = " + previous);
}
+ final List<ChunkBuffer> byteBufferList;
+ CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+ validateFuture = null;
try {
- XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
- blockID.get(), data, tokenString, replicationIndex);
- CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
- respFuture = asyncReply.getResponse();
- CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
- validateFuture = respFuture.thenApplyAsync(e -> {
- try {
- validateResponse(e);
- } catch (IOException sce) {
- respFuture.completeExceptionally(sce);
- }
- return e;
- }, responseExecutor).exceptionally(e -> {
- String msg = "Failed to write chunk " + chunkInfo.getChunkName() +
- " into block " + blockID;
- LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
- CompletionException ce = new CompletionException(msg, e);
- setIoException(ce);
- throw ce;
- });
+ BlockData blockData = null;
+
if (config.getIncrementalChunkList()) {
updateBlockDataForWriteChunk(chunk);
} else {
containerBlockData.addChunks(chunkInfo);
}
+ if (putBlockPiggybacking) {
+ Preconditions.checkNotNull(bufferList);
+ byteBufferList = bufferList;
+ bufferList = null;
+ Preconditions.checkNotNull(byteBufferList);
+
+ blockData = containerBlockData.build();
+ LOG.debug("piggyback chunk list {}", blockData);
+
+ if (config.getIncrementalChunkList()) {
+ // remove any chunks in the containerBlockData list.
+ // since they are sent.
+ containerBlockData.clearChunks();
+ }
+ } else {
+ byteBufferList = null;
+ }
+ XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
+ blockID.get(), data, tokenString, replicationIndex, blockData);
+ CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+ respFuture = asyncReply.getResponse();
+ validateFuture = respFuture.thenApplyAsync(e -> {
+ try {
+ validateResponse(e);
+ } catch (IOException sce) {
+ respFuture.completeExceptionally(sce);
+ }
+ // if the ioException is not set, putBlock is successful
+ if (getIoException() == null && putBlockPiggybacking) {
+ handleSuccessfulPutBlock(e.getWriteChunk().getCommittedBlockLength(),
+ asyncReply, flushPos, byteBufferList);
+ }
+ return e;
+ }, responseExecutor).exceptionally(e -> {
+ String msg = "Failed to write chunk " + chunkInfo.getChunkName() +
+ " into block " + blockID;
+ LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
+ CompletionException ce = new CompletionException(msg, e);
+ setIoException(ce);
+ throw ce;
+ });
clientMetrics.recordWriteChunk(pipeline, chunkInfo.getLen());
- return validateFuture;
+
} catch (IOException | ExecutionException e) {
throw new IOException(EXCEPTION_MSG + e.toString(), e);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
handleInterruptedException(ex, false);
}
- return null;
+ if (putBlockPiggybacking) {
+ putFlushFuture(flushPos, validateFuture);
+ }
+ return validateFuture;
+ }
+
+ private void handleSuccessfulPutBlock(
+ ContainerProtos.GetCommittedBlockLengthResponseProto e,
+ XceiverClientReply asyncReply, long flushPos,
+ List<ChunkBuffer> byteBufferList) {
+ BlockID responseBlockID = BlockID.getFromProtobuf(
+ e.getBlockID());
+ Preconditions.checkState(blockID.get().getContainerBlockID()
+ .equals(responseBlockID.getContainerBlockID()));
+ // updates the bcsId of the block
+ blockID.set(responseBlockID);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Adding index " + asyncReply.getLogIndex() + " flushLength "
+ + flushPos + " numBuffers " + byteBufferList.size()
+ + " blockID " + blockID + " bufferPool size" + bufferPool
+ .getSize() + " currentBufferIndex " + bufferPool
+ .getCurrentBufferIndex());
+ }
+ // for standalone protocol, logIndex will always be 0.
+ updateCommitInfo(asyncReply, byteBufferList);
}
/**
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index adecc3e4c1..c8bfaf3e1b 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -89,13 +89,14 @@ public class ECBlockOutputStream extends BlockOutputStream {
@Override
public void write(byte[] b, int off, int len) throws IOException {
this.currentChunkRspFuture =
- writeChunkToContainer(ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)));
+ writeChunkToContainer(
+ ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)), false);
updateWrittenDataLength(len);
}
public CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
write(
ByteBuffer buff) throws IOException {
- return writeChunkToContainer(ChunkBuffer.wrap(buff));
+ return writeChunkToContainer(ChunkBuffer.wrap(buff), false);
}
public CompletableFuture<ContainerProtos.
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DatanodeVersion.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DatanodeVersion.java
index 30f9df597b..e35d20d53e 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DatanodeVersion.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DatanodeVersion.java
@@ -31,6 +31,8 @@ public enum DatanodeVersion implements ComponentVersion {
DEFAULT_VERSION(0, "Initial version"),
SEPARATE_RATIS_PORTS_AVAILABLE(1, "Version with separated Ratis port."),
+ COMBINED_PUTBLOCK_WRITECHUNK_RPC(2, "WriteChunk can optionally support " +
+ "a PutBlock request"),
FUTURE_VERSION(-1, "Used internally in the client when the server side is "
+ " newer and an unknown server version has arrived to the client.");
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
index 86336e9bc7..d3f39c023b 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
@@ -40,6 +40,7 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFi
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ListBlockResponseProto;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.ozone.common.ChunkBuffer;
@@ -213,6 +214,28 @@ public final class ContainerCommandResponseBuilders {
.build();
}
+ /**
+ * Gets a response for the WriteChunk RPC.
+ * @param msg - ContainerCommandRequestProto
+ * @return - ContainerCommandResponseProto
+ */
+ public static ContainerCommandResponseProto getWriteChunkResponseSuccess(
+ ContainerCommandRequestProto msg, BlockData blockData) {
+
+ WriteChunkResponseProto.Builder writeChunk =
+ WriteChunkResponseProto.newBuilder();
+ if (blockData != null) {
+ writeChunk.setCommittedBlockLength(
+ getCommittedBlockLengthResponseBuilder(
+ blockData.getSize(), blockData.getBlockID()));
+
+ }
+ return getSuccessResponseBuilder(msg)
+ .setCmdType(Type.WriteChunk)
+ .setWriteChunk(writeChunk)
+ .build();
+ }
+
/**
* Gets a response to the read small file call.
* @param request - Msg
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index 1453ae56b4..702710474a 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -406,8 +406,10 @@ public final class ContainerProtocolCalls {
*/
public static XceiverClientReply writeChunkAsync(
XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID,
- ByteString data, String tokenString, int replicationIndex)
+ ByteString data, String tokenString,
+ int replicationIndex, BlockData blockData)
throws IOException, ExecutionException, InterruptedException {
+
WriteChunkRequestProto.Builder writeChunkRequest =
WriteChunkRequestProto.newBuilder()
.setBlockID(DatanodeBlockID.newBuilder()
@@ -418,6 +420,12 @@ public final class ContainerProtocolCalls {
.build())
.setChunkData(chunk)
.setData(data);
+ if (blockData != null) {
+ PutBlockRequestProto.Builder createBlockRequest =
+ PutBlockRequestProto.newBuilder()
+ .setBlockData(blockData);
+ writeChunkRequest.setBlock(createBlockRequest);
+ }
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder()
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index f4ffc4ef27..9eb5b909cc 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -432,11 +432,10 @@ public class ContainerStateMachine extends
BaseStateMachine {
if (!blockAlreadyFinalized) {
// create the log entry proto
final WriteChunkRequestProto commitWriteChunkProto =
- WriteChunkRequestProto.newBuilder()
- .setBlockID(write.getBlockID())
- .setChunkData(write.getChunkData())
+ WriteChunkRequestProto.newBuilder(write)
// skipping the data field as it is
// already set in statemachine data proto
+ .clearData()
.build();
ContainerCommandRequestProto commitContainerCommandProto =
ContainerCommandRequestProto
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 594500b77b..8a2a1bc56e 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -112,6 +112,7 @@ import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuil
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadContainerResponse;
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getSuccessResponse;
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getSuccessResponseBuilder;
+import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getWriteChunkResponseSuccess;
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest;
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.putBlockResponseSuccess;
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
@@ -780,6 +781,7 @@ public class KeyValueHandler extends Handler {
data = chunkManager.readChunk(kvContainer, blockID, chunkInfo,
dispatcherContext);
+ LOG.debug("read chunk from block {} chunk {}", blockID, chunkInfo);
// Validate data only if the read chunk is issued by Ratis for its
// internal logic.
// For client reads, the client is expected to validate.
@@ -841,6 +843,7 @@ public class KeyValueHandler extends Handler {
return malformedRequest(request);
}
+ ContainerProtos.BlockData blockDataProto = null;
try {
checkContainerOpen(kvContainer);
@@ -864,6 +867,28 @@ public class KeyValueHandler extends Handler {
chunkManager
.writeChunk(kvContainer, blockID, chunkInfo, data,
dispatcherContext);
+ final boolean isCommit = dispatcherContext.getStage().isCommit();
+ if (isCommit && writeChunk.hasBlock()) {
+ metrics.incContainerOpsMetrics(Type.PutBlock);
+ BlockData blockData = BlockData.getFromProtoBuf(
+ writeChunk.getBlock().getBlockData());
+ // optimization for hsync when WriteChunk is in commit phase:
+ //
+ // block metadata is piggybacked in the same message.
+ // there will not be an additional PutBlock request.
+ //
+ // End of block will always be sent as a standalone PutBlock.
+ // the PutBlock piggybacked in WriteChunk is never end of block.
+ //
+ // do not do this in WRITE_DATA phase otherwise PutBlock will be out
+ // of order.
+ blockData.setBlockCommitSequenceId(dispatcherContext.getLogIndex());
+ blockManager.putBlock(kvContainer, blockData, false);
+ blockDataProto = blockData.getProtoBufMessage();
+ final long numBytes = blockDataProto.getSerializedSize();
+ metrics.incContainerBytesStats(Type.PutBlock, numBytes);
+ }
+
// We should increment stats after writeChunk
if (isWrite) {
metrics.incContainerBytesStats(Type.WriteChunk, writeChunk
@@ -877,7 +902,7 @@ public class KeyValueHandler extends Handler {
request);
}
- return getSuccessResponse(request);
+ return getWriteChunkResponseSuccess(request, blockDataProto);
}
/**
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
index 4690565b0b..413f36a761 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
@@ -54,7 +54,8 @@ import org.slf4j.LoggerFactory;
*/
public class BlockManagerImpl implements BlockManager {
- static final Logger LOG = LoggerFactory.getLogger(BlockManagerImpl.class);
+ public static final Logger LOG =
+ LoggerFactory.getLogger(BlockManagerImpl.class);
private ConfigurationSource config;
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java
index 8d55bfad78..26719d7f03 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java
@@ -66,7 +66,7 @@ public abstract class AbstractDatanodeStore implements
DatanodeStore {
private Table<String, Long> finalizeBlocksTableWithIterator;
- static final Logger LOG =
+ public static final Logger LOG =
LoggerFactory.getLogger(AbstractDatanodeStore.class);
private volatile DBStore store;
private final AbstractDatanodeDBDefinition dbDef;
diff --git
a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index ccde261de0..f5cac29923 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -436,9 +436,11 @@ message WriteChunkRequestProto {
required DatanodeBlockID blockID = 1;
optional ChunkInfo chunkData = 2;
optional bytes data = 3;
+ optional PutBlockRequestProto block = 4;
}
message WriteChunkResponseProto {
+ optional GetCommittedBlockLengthResponseProto committedBlockLength = 1;
}
enum ReadChunkVersion {
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
index 7e5de329d1..0d82f0f8bb 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
@@ -129,21 +129,26 @@ public class MockXceiverClientSpi extends
XceiverClientSpi {
}
private PutBlockResponseProto putBlock(PutBlockRequestProto putBlock) {
+ return PutBlockResponseProto.newBuilder()
+ .setCommittedBlockLength(
+ doPutBlock(putBlock.getBlockData()))
+ .build();
+ }
+
+ private GetCommittedBlockLengthResponseProto doPutBlock(
+ ContainerProtos.BlockData blockData) {
long length = 0;
- for (ChunkInfo chunk : putBlock.getBlockData().getChunksList()) {
+ for (ChunkInfo chunk : blockData.getChunksList()) {
length += chunk.getLen();
}
- datanodeStorage.putBlock(putBlock.getBlockData().getBlockID(),
- putBlock.getBlockData());
+ datanodeStorage.putBlock(blockData.getBlockID(),
+ blockData);
- return PutBlockResponseProto.newBuilder()
- .setCommittedBlockLength(
- GetCommittedBlockLengthResponseProto.newBuilder()
- .setBlockID(putBlock.getBlockData().getBlockID())
+ return GetCommittedBlockLengthResponseProto.newBuilder()
+ .setBlockID(blockData.getBlockID())
.setBlockLength(length)
- .build())
- .build();
+ .build();
}
private XceiverClientReply result(
@@ -166,8 +171,15 @@ public class MockXceiverClientSpi extends XceiverClientSpi
{
datanodeStorage
.writeChunk(writeChunk.getBlockID(), writeChunk.getChunkData(),
writeChunk.getData());
- return WriteChunkResponseProto.newBuilder()
- .build();
+
+ WriteChunkResponseProto.Builder builder =
+ WriteChunkResponseProto.newBuilder();
+ if (writeChunk.hasBlock()) {
+ ContainerProtos.BlockData
+ blockData = writeChunk.getBlock().getBlockData();
+ builder.setCommittedBlockLength(doPutBlock(blockData));
+ }
+ return builder.build();
}
@Override
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
index 015eaa2916..daa433f68f 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
@@ -33,27 +33,27 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.crypto.CryptoCodec;
import org.apache.hadoop.crypto.CryptoOutputStream;
import org.apache.hadoop.crypto.Encryptor;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
+import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.StorageType;
-import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
-import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -73,6 +73,9 @@ import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl;
+import org.apache.hadoop.ozone.container.metadata.AbstractDatanodeStore;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.hadoop.ozone.om.OzoneManager;
@@ -81,9 +84,6 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
-import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequest;
-import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequestWithFSO;
-import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.ozone.test.GenericTestUtils;
@@ -156,6 +156,7 @@ public class TestHSync {
// Reduce KeyDeletingService interval
CONF.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
TimeUnit.MILLISECONDS);
CONF.setBoolean("ozone.client.incremental.chunk.list", true);
+ CONF.setBoolean("ozone.client.stream.putblock.piggybacking", true);
CONF.setBoolean(OZONE_CHUNK_LIST_INCREMENTAL, true);
ClientConfigForTesting.newBuilder(StorageUnit.BYTES)
.setBlockSize(BLOCK_SIZE)
@@ -177,11 +178,11 @@ public class TestHSync {
bucket = TestDataUtil.createVolumeAndBucket(client, layout);
// Enable DEBUG level logging for relevant classes
- GenericTestUtils.setLogLevel(OMKeyRequest.LOG, Level.DEBUG);
- GenericTestUtils.setLogLevel(OMKeyCommitRequest.LOG, Level.DEBUG);
- GenericTestUtils.setLogLevel(OMKeyCommitRequestWithFSO.LOG, Level.DEBUG);
+ GenericTestUtils.setLogLevel(BlockManagerImpl.LOG, Level.DEBUG);
+ GenericTestUtils.setLogLevel(AbstractDatanodeStore.LOG, Level.DEBUG);
GenericTestUtils.setLogLevel(BlockOutputStream.LOG, Level.DEBUG);
GenericTestUtils.setLogLevel(BlockInputStream.LOG, Level.DEBUG);
+ GenericTestUtils.setLogLevel(KeyValueHandler.LOG, Level.DEBUG);
}
@AfterAll
@@ -549,7 +550,8 @@ public class TestHSync {
break;
}
for (int i = 0; i < n; i++) {
- assertEquals(data[offset + i], buffer[i]);
+ assertEquals(data[offset + i], buffer[i],
+ "expected at offset " + offset + " i=" + i);
}
offset += n;
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 161bf3c3b9..9c76c0ec0c 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -275,7 +275,7 @@ public interface MiniOzoneCluster extends AutoCloseable {
protected boolean includeRecon = false;
protected int dnInitialVersion =
DatanodeVersion.FUTURE_VERSION.toProtoValue();
- protected int dnCurrentVersion =
DatanodeVersion.FUTURE_VERSION.toProtoValue();
+ protected int dnCurrentVersion =
DatanodeVersion.COMBINED_PUTBLOCK_WRITECHUNK_RPC.toProtoValue();
protected int numOfDatanodes = 3;
protected boolean startDataNodes = true;
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
index e15e1e4d63..ce5432739c 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
@@ -80,8 +80,13 @@ class TestBlockOutputStream {
static MiniOzoneCluster createCluster() throws IOException,
InterruptedException, TimeoutException {
-
OzoneConfiguration conf = new OzoneConfiguration();
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumType(ChecksumType.NONE);
+ clientConfig.setStreamBufferFlushDelay(false);
+ clientConfig.setEnablePutblockPiggybacking(true);
+ conf.setFromObject(clientConfig);
+
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, TimeUnit.SECONDS);
conf.setQuietMode(false);
@@ -397,7 +402,7 @@ class TestBlockOutputStream {
key.flush();
assertEquals(writeChunkCount + 2,
metrics.getContainerOpCountMetrics(WriteChunk));
- assertEquals(putBlockCount + 1,
+ assertEquals(putBlockCount,
metrics.getContainerOpCountMetrics(PutBlock));
assertEquals(pendingWriteChunkCount,
metrics.getPendingContainerOpCountMetrics(WriteChunk));
@@ -426,9 +431,9 @@ class TestBlockOutputStream {
metrics.getPendingContainerOpCountMetrics(PutBlock));
assertEquals(writeChunkCount + 2,
metrics.getContainerOpCountMetrics(WriteChunk));
- assertEquals(putBlockCount + 2,
+ assertEquals(putBlockCount + 1,
metrics.getContainerOpCountMetrics(PutBlock));
- assertEquals(totalOpCount + 4, metrics.getTotalOpCount());
+ assertEquals(totalOpCount + 3, metrics.getTotalOpCount());
assertEquals(0, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1, client.getObjectStore(), VOLUME, BUCKET);
@@ -493,9 +498,9 @@ class TestBlockOutputStream {
metrics.getPendingContainerOpCountMetrics(PutBlock));
assertEquals(writeChunkCount + 3,
metrics.getContainerOpCountMetrics(WriteChunk));
- assertEquals(putBlockCount + 2,
+ assertEquals(putBlockCount + 1,
metrics.getContainerOpCountMetrics(PutBlock));
- assertEquals(totalOpCount + 5, metrics.getTotalOpCount());
+ assertEquals(totalOpCount + 4, metrics.getTotalOpCount());
assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// make sure the bufferPool is empty
assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
@@ -686,9 +691,9 @@ class TestBlockOutputStream {
assertEquals(writeChunkCount + 5,
metrics.getContainerOpCountMetrics(WriteChunk));
// The previous flush did not trigger any action with flushDelay enabled
- assertEquals(putBlockCount + (flushDelay ? 3 : 4),
+ assertEquals(putBlockCount + (flushDelay ? 2 : 3),
metrics.getContainerOpCountMetrics(PutBlock));
- assertEquals(totalOpCount + (flushDelay ? 8 : 9),
+ assertEquals(totalOpCount + (flushDelay ? 7 : 8),
metrics.getTotalOpCount());
assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
index 2ae69dc3c9..e773bf7ed7 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
@@ -607,18 +607,40 @@ public class TestOmContainerLocationCache {
private void mockWriteChunkResponse(XceiverClientSpi mockDnProtocol)
throws IOException, ExecutionException, InterruptedException {
- ContainerCommandResponseProto writeResponse =
- ContainerCommandResponseProto.newBuilder()
- .setWriteChunk(WriteChunkResponseProto.newBuilder().build())
- .setResult(Result.SUCCESS)
- .setCmdType(Type.WriteChunk)
- .build();
doAnswer(invocation ->
- new XceiverClientReply(completedFuture(writeResponse)))
+ new XceiverClientReply(
+ completedFuture(
+ createWriteChunkResponse(
+ (ContainerCommandRequestProto)invocation.getArgument(0)))))
.when(mockDnProtocol)
.sendCommandAsync(argThat(matchCmd(Type.WriteChunk)));
}
+ ContainerCommandResponseProto createWriteChunkResponse(
+ ContainerCommandRequestProto request) {
+ ContainerProtos.WriteChunkRequestProto writeChunk =
request.getWriteChunk();
+
+ WriteChunkResponseProto.Builder builder =
+ WriteChunkResponseProto.newBuilder();
+ if (writeChunk.hasBlock()) {
+ ContainerProtos.BlockData
+ blockData = writeChunk.getBlock().getBlockData();
+
+ GetCommittedBlockLengthResponseProto response =
+ GetCommittedBlockLengthResponseProto.newBuilder()
+ .setBlockID(blockData.getBlockID())
+ .setBlockLength(blockData.getSize())
+ .build();
+
+ builder.setCommittedBlockLength(response);
+ }
+ return ContainerCommandResponseProto.newBuilder()
+ .setWriteChunk(builder.build())
+ .setResult(Result.SUCCESS)
+ .setCmdType(Type.WriteChunk)
+ .build();
+ }
+
private ArgumentMatcher<ContainerCommandRequestProto> matchCmd(Type type) {
return argument -> argument != null && argument.getCmdType() == type;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]