This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-7593 by this push:
new a86732890d HDDS-11014. [hsync] Block finalization should also merge
last chunk to blockDataTable. (#6847)
a86732890d is described below
commit a86732890d0721f529bc805c5d59ae1eb78a410f
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Tue Jun 25 10:41:14 2024 -0700
HDDS-11014. [hsync] Block finalization should also merge last chunk to
blockDataTable. (#6847)
---
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 12 ++++++------
.../hadoop/hdds/scm/storage/ECBlockOutputStream.java | 4 ++--
.../hadoop/hdds/scm/storage/ContainerProtocolCalls.java | 6 ++++--
.../hadoop/ozone/container/keyvalue/KeyValueHandler.java | 6 ++----
.../ozone/container/keyvalue/impl/BlockManagerImpl.java | 16 ++++++++++++++++
5 files changed, 30 insertions(+), 14 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index c1f92c8337..2d40439c33 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -573,13 +573,13 @@ public class BlockOutputStream extends OutputStream {
private void writeChunk(ChunkBuffer buffer)
throws IOException {
writeChunkCommon(buffer);
- writeChunkToContainer(buffer.duplicate(0, buffer.position()), false);
+ writeChunkToContainer(buffer.duplicate(0, buffer.position()), false,
false);
}
- private void writeChunkAndPutBlock(ChunkBuffer buffer)
+ private void writeChunkAndPutBlock(ChunkBuffer buffer, boolean close)
throws IOException {
writeChunkCommon(buffer);
- writeChunkToContainer(buffer.duplicate(0, buffer.position()), true);
+ writeChunkToContainer(buffer.duplicate(0, buffer.position()), true, close);
}
/**
@@ -618,7 +618,7 @@ public class BlockOutputStream extends OutputStream {
if (currentBuffer.hasRemaining()) {
if (allowPutBlockPiggybacking) {
updateFlushLength();
- writeChunkAndPutBlock(currentBuffer);
+ writeChunkAndPutBlock(currentBuffer, close);
} else {
writeChunk(currentBuffer);
updateFlushLength();
@@ -743,7 +743,7 @@ public class BlockOutputStream extends OutputStream {
* @return
*/
CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
- ChunkBuffer chunk, boolean putBlockPiggybacking) throws IOException {
+ ChunkBuffer chunk, boolean putBlockPiggybacking, boolean close) throws
IOException {
int effectiveChunkSize = chunk.remaining();
final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
final ByteString data = chunk.toByteString(
@@ -805,7 +805,7 @@ public class BlockOutputStream extends OutputStream {
}
XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
- blockID.get(), data, tokenString, replicationIndex, blockData);
+ blockID.get(), data, tokenString, replicationIndex, blockData,
close);
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
respFuture = asyncReply.getResponse();
validateFuture = respFuture.thenApplyAsync(e -> {
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index a57ae74f99..843727cef2 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -90,13 +90,13 @@ public class ECBlockOutputStream extends BlockOutputStream {
public void write(byte[] b, int off, int len) throws IOException {
this.currentChunkRspFuture =
writeChunkToContainer(
- ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)), false);
+ ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)), false, false);
updateWrittenDataLength(len);
}
public CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
write(
ByteBuffer buff) throws IOException {
- return writeChunkToContainer(ChunkBuffer.wrap(buff), false);
+ return writeChunkToContainer(ChunkBuffer.wrap(buff), false, false);
}
public CompletableFuture<ContainerProtos.
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index 37cc075f21..41d255ef07 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -436,10 +436,11 @@ public final class ContainerProtocolCalls {
* @param tokenString serialized block token
* @throws IOException if there is an I/O error while performing the call
*/
+ @SuppressWarnings("parameternumber")
public static XceiverClientReply writeChunkAsync(
XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID,
ByteString data, String tokenString,
- int replicationIndex, BlockData blockData)
+ int replicationIndex, BlockData blockData, boolean close)
throws IOException, ExecutionException, InterruptedException {
WriteChunkRequestProto.Builder writeChunkRequest =
@@ -455,7 +456,8 @@ public final class ContainerProtocolCalls {
if (blockData != null) {
PutBlockRequestProto.Builder createBlockRequest =
PutBlockRequestProto.newBuilder()
- .setBlockData(blockData);
+ .setBlockData(blockData)
+ .setEof(close);
writeChunkRequest.setBlock(createBlockRequest);
}
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 05979d85fa..440e7f4385 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -877,13 +877,11 @@ public class KeyValueHandler extends Handler {
// block metadata is piggybacked in the same message.
// there will not be an additional PutBlock request.
//
- // End of block will always be sent as a standalone PutBlock.
- // the PutBlock piggybacked in WriteChunk is never end of block.
- //
// do not do this in WRITE_DATA phase otherwise PutBlock will be out
// of order.
blockData.setBlockCommitSequenceId(dispatcherContext.getLogIndex());
- blockManager.putBlock(kvContainer, blockData, false);
+ boolean eob = writeChunk.getBlock().getEof();
+ blockManager.putBlock(kvContainer, blockData, eob);
blockDataProto = blockData.getProtoBufMessage();
final long numBytes = blockDataProto.getSerializedSize();
metrics.incContainerBytesStats(Type.PutBlock, numBytes);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
index 27a40400b7..ceedcb94cf 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
@@ -244,10 +244,26 @@ public class BlockManagerImpl implements BlockManager {
db.getStore().getFinalizeBlocksTable().putWithBatch(batch,
kvContainer.getContainerData().getBlockKey(localID), localID);
db.getStore().getBatchHandler().commitBatchOperation(batch);
+
+ mergeLastChunkForBlockFinalization(blockId, db, kvContainer, batch,
localID);
}
}
}
+ private void mergeLastChunkForBlockFinalization(BlockID blockId, DBHandle db,
+ KeyValueContainer kvContainer, BatchOperation batch,
+ long localID) throws IOException {
+ // if the chunk list of the block to be finalized was written incremental,
+ // merge the last chunk into block data.
+ BlockData blockData = getBlockByID(db, blockId,
kvContainer.getContainerData());
+ if (blockData.getMetadata().containsKey(INCREMENTAL_CHUNK_LIST)) {
+ BlockData emptyBlockData = new BlockData(blockId);
+ emptyBlockData.addMetadata(INCREMENTAL_CHUNK_LIST, "");
+ db.getStore().putBlockByID(batch, incrementalEnabled, localID,
+ emptyBlockData, kvContainer.getContainerData(), true);
+ }
+ }
+
@Override
public BlockData getBlock(Container container, BlockID blockID)
throws IOException {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]