[
https://issues.apache.org/jira/browse/HDDS-9844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856026#comment-17856026
]
Duong edited comment on HDDS-9844 at 6/18/24 6:03 PM:
------------------------------------------------------
Discussed offline with [~weichiu] and [~smeng]. The hsync() and write() API can
logically look like the below.
{code:java}
void hsync() {
CompleteableFuture<PutBlockRespose> putBlockResponseFuture;
Long lastCommitIndex;
synchronize(this) {
// 1. if there is pending data in the currentBuffer, create
writeChunk and send.
if (totalDataFlushedLength < writtenDataLength) {
writeChunk = getWriteChunkRequest(currentBuffer);
// 2. Send the writeChunk request.
sendWriteChunkAsync(writeChunk);
// 3. Reset current Buffer to accomodate new write
currentBuffer = reset(currentBuffer);
// 4. Update putBlock data with the last writeChunk and send
putBlock.
this.containerBlockData.add(writeChunk);
putBlockResponseFuture = executePutBlockAsync();
} else {
lastCommitIndex = getLastPendingCommit();
}
}
// 4. Wait for `putBlock` reply and issue a watchForCommmit if there is
data written by this hsync.
if (putBlockResponseFuture != null) {
PutBlockRespose putBlockResponse = putBlockResponseFuture.get();
// 5. watchForCommit.
watchForCommit(putBlockResponse.commitIndex);
} else {
// wait for lastCommitIndex to pass.
}
}
void write(byte[] data, int off, int len) {
CompleteableFuture<PutBlockRespose> putBlockResponseFuture;
synchronize(this) {
// 1. write data to the currentBuffer.
currentBuffer.put(data)
// 2. if buffer is full,
if (currentBuffer.isFull()) {
writeChunk = getWriteChunkRequest(currentBuffer);
sendWriteChunkAsync(writeChunk);
currentBuffer = reset(currentBuffer);
this.containerBlockData.add(writeChunk);
putBlockResponseFuture = executePutBlockAsync();
}
}
if (putBlockResponseFuture != null) {
PutBlockRespose putBlockResponse = putBlockResponseFuture.get();
watchForCommit(putBlockResponse.commitIndex);
}
}{code}
Implementation-wise, some points need more thought.
1. The usage of ChunkBuffer and BufferPool. When a thread create a writeChunk
request from the curentBuffer, there's a configuration to enable unsafeWrap.
UnsafeWrap allows the writeChunk request content to refer directly to the
curentBuffer address instead of copying data. This avoids creating too many
temporary buffers for writeChunk and thus eliminates the cost of GC. (With the
copying approach, we'll have O(GC) = O(written data)).
However, in the context of multithreaded, after a thread creates a writeChunk
request out of the currentBuffer, we need to reserve/lock that buffer until the
writeChunk is successfully sent. We need to define the exact behavior of how
ChunkBuffers are organized to accommodate a variable number of hsync() caller
threads.
2. The current implementation contains quite a lot of wrong abstraction and
wrong responsibility in components. E.g.
- CommitWatcher has no business to manager the flushFutures (putBlock futures).
This should be a thing in the BlockInputStream.
- The abstraction on top of ECBlockInputStream and RatisBlockInputStream are
incorrect.
- Much of the complexity of the code comes from (I think) the efforts to make
it work for multithreaded contexts, but it doesn't. In the end, we got the
complexity of the multithreaded implementation with the functionality of
non-threadsafe.
These make the code changes for desynchronize OutputStream abundant. We can
probably make some refactoring efforts to clean them up before proceeding with
the real code change.
c.c [~weichiu][~smeng]. [~szetszwo], we need your opinions about the pseudo
code above and the buffer management.
was (Author: JIRAUSER290990):
Discussed offline with [~weichiu] and [~smeng]. The hsync() and write() API can
logically look like the below.
{code:java}
void hsync() {
CompleteableFuture<PutBlockRespose> putBlockResponseFuture;
Long lastCommitIndex;
synchronize(this) {
// 1. if there is pending data in the currentBuffer, create
writeChunk and send.
if (totalDataFlushedLength < writtenDataLength) {
writeChunk = getWriteChunkRequest(currentBuffer);
// 2. Send the writeChunk request.
sendWriteChunkAsync(writeChunk);
// 3. Reset current Buffer to accomodate new write
currentBuffer = reset(currentBuffer);
// 4. Update putBlock data with the last writeChunk and send
putBlock.
this.containerBlockData.add(writeChunk);
putBlockResponseFuture = executePutBlockAsync();
} else {
lastCommitIndex = getLastPendingCommit();
}
}
// 4. Wait for `putBlock` reply and issue a watchForCommmit if there is
data written by this hsync.
if (putBlockResponseFuture != null) {
PutBlockRespose putBlockResponse = putBlockResponseFuture.get();
// 5. watchForCommit.
watchForCommit(putBlockResponse.commitIndex);
} else {
// wait for lastCommitIndex to pass.
}
}
void write(byte[] data, int off, int len) {
CompleteableFuture<PutBlockRespose> putBlockResponseFuture;
synchronize(this) {
// 1. write data to the currentBuffer.
currentBuffer.put(data)
// 2. if buffer is full,
if (currentBuffer.isFull()) {
writeChunk = getWriteChunkRequest(currentBuffer);
sendWriteChunkAsync(writeChunk);
currentBuffer = reset(currentBuffer);
this.containerBlockData.add(writeChunk);
putBlockResponseFuture = executePutBlockAsync();
}
}
if (putBlockResponseFuture != null) {
PutBlockRespose putBlockResponse = putBlockResponseFuture.get();
watchForCommit(putBlockResponse.commitIndex);
}
}{code}
Implementation-wise, some points need more thought.
1. The usage of ChunkBuffer and BufferPool. When a thread create a writeChunk
request from the curentBuffer, there's a configuration to enable unsafeWrap.
UnsafeWrap allows the writeChunk request content to refer directly to the
curentBuffer address instead of copying data. This avoids creating too many
temporary buffers for writeChunk and thus eliminates the cost of GC. (With the
copying approach, we'll have O(GC) = O(written data)).
However, in the context of multithreaded, after a thread creates a writeChunk
request out of the currentBuffer, we need to reserve/lock that buffer until the
writeChunk is successfully sent. We need to define the exact behavior of how
ChunkBuffers are organized to accommodate a variable number of hsync() caller
threads.
2. The current implementation contains quite a lot of wrong abstraction and
wrong responsibility in components. E.g.
- CommitWatcher has no business to manager the flushFutures (putBlock futures).
This should be a thing in the BlockInputStream.
- The abstraction on top of ECBlockInputStream and RatisBlockInputStream are
incorrect.
- Much of the complexity of the code comes from (I think) the efforts to make
it work for multithreaded contexts, but it doesn't. In the end, we got the
complexity of the multithreaded implementation with the functionality of
non-threadsafe.
These make the code changes for desynchronize OutputStream abundant. We can
probably make some refactoring efforts to clean them up before proceeding with
the real code change.
c.c [~weichiu][~smeng]. @szetszwo, we need your opinions about the pseudo code
above and the buffer management.
> [hsync] De-synchronize hsync API
> --------------------------------
>
> Key: HDDS-9844
> URL: https://issues.apache.org/jira/browse/HDDS-9844
> Project: Apache Ozone
> Issue Type: Sub-task
> Reporter: Wei-Chiu Chuang
> Assignee: Siyao Meng
> Priority: Major
> Labels: pull-request-available
> Attachments: Screenshot 2024-02-27 at 8.01.48 PM.png
>
>
> The current hysnc implementation KeyOutputStream.hsync() is wrapped in a
> synchronized block. The HBase write ahead log FSHLog.SyncRunner has multiple
> threads invoking hsync in parallel to reduce client latency.
> We should unsynchronize Ozone's hsync such that it doesn't block waiting for
> the Ratis transaction to respond.
> cc: [~szetszwo] this is what we discussed offline.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]