xintongsong commented on code in PR #21209:
URL: https://github.com/apache/flink/pull/21209#discussion_r1014058947
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java:
##########
@@ -128,28 +128,32 @@ private void checkRelease(
return;
}
- int survivedNum = (int) (poolSize - poolSize * releaseBufferRatio);
+ int releaseNum = (int) (poolSize * releaseBufferRatio);
int numSubpartitions = spillingInfoProvider.getNumSubpartitions();
- int subpartitionSurvivedNum = survivedNum / numSubpartitions;
-
+ int expectedSubpartitionReleaseNum = releaseNum / numSubpartitions;
TreeMap<Integer, Deque<BufferIndexAndChannel>> bufferToRelease = new
TreeMap<>();
for (int subpartitionId = 0; subpartitionId < numSubpartitions;
subpartitionId++) {
Deque<BufferIndexAndChannel> buffersInOrder =
spillingInfoProvider.getBuffersInOrder(
subpartitionId, SpillStatus.SPILL,
ConsumeStatusWithId.ALL_ANY);
- // if the number of subpartition buffers less than survived
buffers, reserved all of
- // them.
- int releaseNum = Math.max(0, buffersInOrder.size() -
subpartitionSurvivedNum);
- while (releaseNum-- != 0) {
+ // if the number of subpartition spilling buffers less than
expected release number,
+ // release all of them.
+ int subpartitionReleaseNum =
+ Math.min(buffersInOrder.size(),
expectedSubpartitionReleaseNum);
+ int subpartitionSurvivedNum = buffersInOrder.size() -
subpartitionReleaseNum;
+ while (subpartitionSurvivedNum-- != 0) {
buffersInOrder.pollLast();
}
bufferToRelease.put(subpartitionId, buffersInOrder);
}
// collect results in order
for (int i = 0; i < numSubpartitions; i++) {
- builder.addBufferToRelease(i, bufferToRelease.getOrDefault(i, new
ArrayDeque<>()));
+ Deque<BufferIndexAndChannel> bufferIndexAndChannels =
bufferToRelease.get(i);
+ if (bufferIndexAndChannels != null &&
!bufferIndexAndChannels.isEmpty()) {
+ builder.addBufferToRelease(i, bufferToRelease.getOrDefault(i,
new ArrayDeque<>()));
+ }
Review Comment:
I don't quite get the differences between calculating the survive vs.
release numbers. What's the purpose?
It seems before this change, there's a bug that we are releasing number of
survive buffers.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java:
##########
@@ -149,9 +147,9 @@ void testDecideActionWithGlobalInfo() {
Map<Integer, List<BufferIndexAndChannel>> expectedReleaseBuffers = new
HashMap<>();
expectedReleaseBuffers.put(
- subpartition1, new ArrayList<>(subpartitionBuffers1.subList(0,
2)));
+ subpartition1, new ArrayList<>(subpartitionBuffers1.subList(0,
3)));
expectedReleaseBuffers.put(
- subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1,
3)));
+ subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1,
4)));
Review Comment:
And please check `createBufferIndexAndChannelsList`. It's creating memory
segment and network buffer for nothing.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java:
##########
@@ -149,9 +147,9 @@ void testDecideActionWithGlobalInfo() {
Map<Integer, List<BufferIndexAndChannel>> expectedReleaseBuffers = new
HashMap<>();
expectedReleaseBuffers.put(
- subpartition1, new ArrayList<>(subpartitionBuffers1.subList(0,
2)));
+ subpartition1, new ArrayList<>(subpartitionBuffers1.subList(0,
3)));
expectedReleaseBuffers.put(
- subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1,
3)));
+ subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1,
4)));
Review Comment:
For this test case, I think we no longer need the progresses.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]