advancedxy commented on code in PR #428:
URL: https://github.com/apache/incubator-uniffle/pull/428#discussion_r1050337778
##########
common/src/test/java/org/apache/uniffle/common/GrpcServerTest.java:
##########
@@ -72,13 +72,14 @@ public void testGrpcExecutorPool() throws Exception {
});
}
- Thread.sleep(1000L);
+ Thread.sleep(100);
double activeThreads =
grpcMetrics.getGaugeMap().get(GRPC_SERVER_EXECUTOR_ACTIVE_THREADS_KEY).get();
assertEquals(2, activeThreads);
double queueSize =
grpcMetrics.getGaugeMap().get(GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE_KEY).get();
assertEquals(1, queueSize);
countDownLatch.await();
+ Thread.sleep(100);
Review Comment:
This is a flaky test, just fixed it anyway.
##########
server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java:
##########
@@ -115,7 +115,9 @@ public StatusCode cacheShuffleData(String appId, int
shuffleId,
ShuffleBuffer buffer = entry.getValue();
long size = buffer.append(spd);
- updateSize(size, isPreAllocated);
+ if (!isPreAllocated) {
Review Comment:
Not exactly... Some UTs passes `isPreAllocated` as false.
##########
server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java:
##########
@@ -121,6 +124,11 @@ public ShuffleTaskManager(
leakShuffleDataCheckExecutorService.scheduleAtFixedRate(
() -> checkLeakShuffleData(), leakShuffleDataCheckInterval,
leakShuffleDataCheckInterval, TimeUnit.MILLISECONDS);
+ this.triggerFlushExecutorService =
Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.getThreadFactory("triggerShuffleBufferManagerFlush"));
+ triggerFlushExecutorService.scheduleAtFixedRate(
Review Comment:
to alleviate the UT changes needed.
After this PR. `cacheShuffleData` don't get a chance to flushMemory when
only one call with preAllocated enabled. The memory will be flushed in the next
round of `cacheShuffleData`.
However a lot of UTs are relied on the above behaviors..
##########
server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java:
##########
@@ -226,26 +227,21 @@ public void sendShuffleData(SendShuffleDataRequest req,
return;
}
final long start = System.currentTimeMillis();
+ manager.markBufferInUse(requireBufferId);
Review Comment:
Well, this could happen, but in practice it should be extremely rare.
Currently our other cleaner logic is based on access time refreshing. This
approach is similar, except it doesn't require a instance lock to update the
timestamp field.
Even if the above case happens, the outcome would be double releasing
`usedMemory` size which has a safe bet that it will never decreased to less
than zero.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]