lizhimins commented on code in PR #7281:
URL: https://github.com/apache/rocketmq/pull/7281#discussion_r1310012901
##########
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java:
##########
@@ -333,99 +318,148 @@ public boolean commit() {
if (closed) {
return false;
}
+ // result is false when we send real commit request
+ // use join for wait flight request done
Boolean result = commitAsync().join();
if (!result) {
result = flightCommitRequest.join();
}
return result;
}
+ private void releaseCommitLock() {
+ if (commitLock.availablePermits() == 0) {
+ commitLock.release();
+ } else {
+ logger.error("[Bug] FileSegmentCommitAsync, lock is already
released: available permits: {}",
+ commitLock.availablePermits());
+ }
+ }
+
+ private void updateDispatchCommitOffset(List<ByteBuffer> bufferList) {
+ if (fileType == FileSegmentType.COMMIT_LOG && bufferList.size() > 0) {
+ dispatchCommitOffset =
+
MessageBufferUtil.getQueueOffset(bufferList.get(bufferList.size() - 1));
+ }
+ }
+
+ /**
+ * @return false: commit, true: no commit operation
+ */
@SuppressWarnings("NonAtomicOperationOnVolatileField")
public CompletableFuture<Boolean> commitAsync() {
if (closed) {
return CompletableFuture.completedFuture(false);
}
- Stopwatch stopwatch = Stopwatch.createStarted();
+
if (!needCommit()) {
return CompletableFuture.completedFuture(true);
}
- try {
- int permits = commitLock.drainPermits();
- if (permits <= 0) {
- return CompletableFuture.completedFuture(false);
- }
- } catch (Exception e) {
+
+ if (commitLock.drainPermits() <= 0) {
return CompletableFuture.completedFuture(false);
}
- List<ByteBuffer> bufferList = rollingUploadBuffer();
- int bufferSize = 0;
- for (ByteBuffer buffer : bufferList) {
- bufferSize += buffer.remaining();
- }
- if (codaBuffer != null) {
- bufferSize += codaBuffer.remaining();
- }
- if (bufferSize == 0) {
- return CompletableFuture.completedFuture(true);
- }
- TieredFileSegmentInputStream inputStream =
TieredFileSegmentInputStreamFactory.build(
- fileType, baseOffset + commitPosition, bufferList, codaBuffer,
bufferSize);
- int finalBufferSize = bufferSize;
+
try {
- flightCommitRequest = commit0(inputStream, commitPosition,
bufferSize, fileType != FileSegmentType.INDEX)
+ if (fileSegmentInputStream != null) {
+ if (correctPosition(this.getSize(), null)) {
+
updateDispatchCommitOffset(fileSegmentInputStream.getBufferList());
+ fileSegmentInputStream = null;
+ } else {
+ fileSegmentInputStream.rewind();
+ }
+ }
+
+ int bufferSize;
+ if (fileSegmentInputStream != null) {
+ bufferSize = fileSegmentInputStream.available();
+ } else {
+ List<ByteBuffer> bufferList = borrowBuffer();
+ bufferSize =
bufferList.stream().mapToInt(ByteBuffer::remaining).sum()
+ + (codaBuffer != null ? codaBuffer.remaining() : 0);
+ if (bufferSize == 0) {
+ releaseCommitLock();
+ return CompletableFuture.completedFuture(true);
+ }
+ fileSegmentInputStream = FileSegmentInputStreamFactory.build(
+ fileType, baseOffset + commitPosition, bufferList,
codaBuffer, bufferSize);
+ }
+
+ return flightCommitRequest = this
+ .commit0(fileSegmentInputStream, commitPosition, bufferSize,
fileType != FileSegmentType.INDEX)
.thenApply(result -> {
if (result) {
- if (fileType == FileSegmentType.COMMIT_LOG &&
bufferList.size() > 0) {
- dispatchCommitOffset =
MessageBufferUtil.getQueueOffset(bufferList.get(bufferList.size() - 1));
- }
- commitPosition += finalBufferSize;
+
updateDispatchCommitOffset(fileSegmentInputStream.getBufferList());
+ commitPosition += bufferSize;
+ fileSegmentInputStream = null;
return true;
- }
- sendBackBuffer(inputStream);
- return false;
- })
- .exceptionally(e -> handleCommitException(inputStream, e))
- .whenComplete((result, e) -> {
- if (commitLock.availablePermits() == 0) {
- logger.debug("TieredFileSegment#commitAsync: commit
cost: {}ms, file: {}, item count: {}, buffer size: {}",
stopwatch.elapsed(TimeUnit.MILLISECONDS), getPath(), bufferList.size(),
finalBufferSize);
- commitLock.release();
} else {
- logger.error("[Bug]TieredFileSegment#commitAsync:
commit lock is already released: available permits: {}",
commitLock.availablePermits());
+ fileSegmentInputStream.rewind();
+ return false;
}
- });
- return flightCommitRequest;
+ })
+ .exceptionally(this::handleCommitException)
+ .whenComplete((result, e) -> releaseCommitLock());
+
} catch (Exception e) {
- handleCommitException(inputStream, e);
- if (commitLock.availablePermits() == 0) {
- logger.debug("TieredFileSegment#commitAsync: commit cost:
{}ms, file: {}, item count: {}, buffer size: {}",
stopwatch.elapsed(TimeUnit.MILLISECONDS), getPath(), bufferList.size(),
finalBufferSize);
- commitLock.release();
- } else {
- logger.error("[Bug]TieredFileSegment#commitAsync: commit lock
is already released: available permits: {}", commitLock.availablePermits());
- }
+ handleCommitException(e);
+ releaseCommitLock();
Review Comment:
Actully not same.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]