ashishkumar50 commented on code in PR #5980:
URL: https://github.com/apache/ozone/pull/5980#discussion_r1531628026
##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java:
##########
@@ -211,6 +214,20 @@ public BlockOutputStream(
this.clientMetrics = clientMetrics;
this.pipeline = pipeline;
this.streamBufferArgs = streamBufferArgs;
+ this.allowPutBlockPiggybacking = config.getEnablePutblockPiggybacking() &&
+ allDataNodesSupportPiggybacking();
+ }
+
+ boolean allDataNodesSupportPiggybacking() {
Review Comment:
Make it private.
##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java:
##########
@@ -594,14 +608,24 @@ private void handleFlushInternal(boolean close)
if (totalDataFlushedLength < writtenDataLength) {
refreshCurrentBuffer();
Preconditions.checkArgument(currentBuffer.position() > 0);
- if (currentBuffer.hasRemaining()) {
- writeChunk(currentBuffer);
- }
+
// This can be a partially filled chunk. Since we are flushing the buffer
// here, we just limit this buffer to the current position. So that next
// write will happen in new buffer
- updateFlushLength();
- executePutBlock(close, false);
+ if (currentBuffer.hasRemaining()) {
+ if (writtenDataLength - totalDataFlushedLength < 100 * 1024 &&
Review Comment:
Better we can define constant for 100 * 1024
##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java:
##########
@@ -594,14 +608,24 @@ private void handleFlushInternal(boolean close)
if (totalDataFlushedLength < writtenDataLength) {
refreshCurrentBuffer();
Preconditions.checkArgument(currentBuffer.position() > 0);
- if (currentBuffer.hasRemaining()) {
- writeChunk(currentBuffer);
- }
+
// This can be a partially filled chunk. Since we are flushing the buffer
// here, we just limit this buffer to the current position. So that next
// write will happen in new buffer
- updateFlushLength();
- executePutBlock(close, false);
+ if (currentBuffer.hasRemaining()) {
+ if (writtenDataLength - totalDataFlushedLength < 100 * 1024 &&
+ allowPutBlockPiggybacking) {
+ updateFlushLength();
+ writeSmallChunk(currentBuffer);
+ } else {
+ writeChunk(currentBuffer);
+ updateFlushLength();
+ executePutBlock(close, false);
+ }
+ } else {
+ updateFlushLength();
+ executePutBlock(close, false);
Review Comment:
Whether PutBlock required here for small chunk?
##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java:
##########
@@ -743,42 +769,95 @@ CompletableFuture<ContainerCommandResponseProto>
writeChunkToContainer(
+ ", previous = " + previous);
}
+ final List<ChunkBuffer> byteBufferList;
+ CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+ validateFuture = null;
try {
- XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
- blockID.get(), data, tokenString, replicationIndex);
- CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
- respFuture = asyncReply.getResponse();
- CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
- validateFuture = respFuture.thenApplyAsync(e -> {
- try {
- validateResponse(e);
- } catch (IOException sce) {
- respFuture.completeExceptionally(sce);
- }
- return e;
- }, responseExecutor).exceptionally(e -> {
- String msg = "Failed to write chunk " + chunkInfo.getChunkName() +
- " into block " + blockID;
- LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
- CompletionException ce = new CompletionException(msg, e);
- setIoException(ce);
- throw ce;
- });
+ BlockData blockData = null;
+
if (config.getIncrementalChunkList()) {
updateBlockDataForWriteChunk(chunk);
} else {
containerBlockData.addChunks(chunkInfo);
}
+ //containerBlockData.addChunks(chunkInfo);
Review Comment:
Remove commented code.
##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java:
##########
@@ -743,42 +769,95 @@ CompletableFuture<ContainerCommandResponseProto>
writeChunkToContainer(
+ ", previous = " + previous);
}
+ final List<ChunkBuffer> byteBufferList;
+ CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+ validateFuture = null;
try {
- XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
- blockID.get(), data, tokenString, replicationIndex);
- CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
- respFuture = asyncReply.getResponse();
- CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
- validateFuture = respFuture.thenApplyAsync(e -> {
- try {
- validateResponse(e);
- } catch (IOException sce) {
- respFuture.completeExceptionally(sce);
- }
- return e;
- }, responseExecutor).exceptionally(e -> {
- String msg = "Failed to write chunk " + chunkInfo.getChunkName() +
- " into block " + blockID;
- LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
- CompletionException ce = new CompletionException(msg, e);
- setIoException(ce);
- throw ce;
- });
+ BlockData blockData = null;
+
if (config.getIncrementalChunkList()) {
updateBlockDataForWriteChunk(chunk);
} else {
containerBlockData.addChunks(chunkInfo);
}
+ //containerBlockData.addChunks(chunkInfo);
+ if (smallChunk) {
+ Preconditions.checkNotNull(bufferList);
+ byteBufferList = bufferList;
+ bufferList = null;
+ Preconditions.checkNotNull(byteBufferList);
+
+ blockData = containerBlockData.build();
+ LOG.debug("piggyback chunk list {}", blockData);
+
+ if (config.getIncrementalChunkList()) {
+ // remove any chunks in the containerBlockData list.
+ // since they are sent.
+ containerBlockData.clearChunks();
+ }
+ } else {
+ byteBufferList = null;
+ }
+ XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
+ blockID.get(), data, tokenString, replicationIndex, blockData);
+ CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+ respFuture = asyncReply.getResponse();
+ validateFuture = respFuture.thenApplyAsync(e -> {
+ try {
+ validateResponse(e);
+ } catch (IOException sce) {
+ respFuture.completeExceptionally(sce);
+ }
+ // if the ioException is not set, putBlock is successful
+ if (getIoException() == null && smallChunk) {
+ handleSuccessfulPutBlock(e.getWriteChunk().getCommittedBlockLength(),
+ asyncReply, flushPos, byteBufferList);
+ }
+ return e;
+ }, responseExecutor).exceptionally(e -> {
+ String msg = "Failed to write chunk " + chunkInfo.getChunkName() +
+ " into block " + blockID;
+ LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
+ CompletionException ce = new CompletionException(msg, e);
+ setIoException(ce);
+ throw ce;
+ });
+ //containerBlockData.addChunks(chunkInfo);
Review Comment:
Remove commented code.
##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java:
##########
@@ -743,42 +769,95 @@ CompletableFuture<ContainerCommandResponseProto>
writeChunkToContainer(
+ ", previous = " + previous);
}
+ final List<ChunkBuffer> byteBufferList;
+ CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+ validateFuture = null;
try {
- XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
- blockID.get(), data, tokenString, replicationIndex);
- CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
- respFuture = asyncReply.getResponse();
- CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
- validateFuture = respFuture.thenApplyAsync(e -> {
- try {
- validateResponse(e);
- } catch (IOException sce) {
- respFuture.completeExceptionally(sce);
- }
- return e;
- }, responseExecutor).exceptionally(e -> {
- String msg = "Failed to write chunk " + chunkInfo.getChunkName() +
- " into block " + blockID;
- LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
- CompletionException ce = new CompletionException(msg, e);
- setIoException(ce);
- throw ce;
- });
+ BlockData blockData = null;
+
if (config.getIncrementalChunkList()) {
updateBlockDataForWriteChunk(chunk);
} else {
containerBlockData.addChunks(chunkInfo);
}
+ //containerBlockData.addChunks(chunkInfo);
+ if (smallChunk) {
+ Preconditions.checkNotNull(bufferList);
+ byteBufferList = bufferList;
+ bufferList = null;
+ Preconditions.checkNotNull(byteBufferList);
+
+ blockData = containerBlockData.build();
+ LOG.debug("piggyback chunk list {}", blockData);
+
+ if (config.getIncrementalChunkList()) {
+ // remove any chunks in the containerBlockData list.
+ // since they are sent.
+ containerBlockData.clearChunks();
Review Comment:
Do we need to remove last chunk as well here which is updated in
`updateBlockDataForWriteChunk` in case of `IncrementalChunkList`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]