reswqa commented on code in PR #20371:
URL: https://github.com/apache/flink/pull/20371#discussion_r930867604
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContext.java:
##########
@@ -97,24 +97,40 @@ public Optional<CompletableFuture<Void>> getSpilledFuture()
{
return Optional.ofNullable(spilledFuture);
}
- public void release() {
- checkState(!released, "Release buffer repeatedly is unexpected.");
+ /**
+ * Mark buffer status to release.
+ *
+ * @return Whether the status has been modified successfully. If it has
been released, false
+ * will be returned.
+ */
+ public boolean release() {
+ if (isReleased()) {
+ return false;
+ }
released = true;
// decrease ref count when buffer is released from memory.
buffer.recycleBuffer();
+ return true;
}
- public void startSpilling(CompletableFuture<Void> spilledFuture) {
- checkState(!released, "Buffer is already released.");
- checkState(
- !spillStarted && this.spilledFuture == null,
- "Spill buffer repeatedly is unexpected.");
+ /**
+ * Mark buffer status to startSpilling.
+ *
+ * @param spilledFuture completable future of this buffer's spilling
operation.
+ * @return Whether the status has been modified successfully. If it has
been released or is in
+ * startSpilling status, false will be returned.
+ */
+ public boolean startSpilling(CompletableFuture<Void> spilledFuture) {
+ if (isReleased() || isSpillStarted()) {
Review Comment:
It is possible that thread `A` decides to release the buffer, and another
thread `B` decides to start spilling before `A` mark this buffer to released
status.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]