FMX commented on code in PR #3485:
URL: https://github.com/apache/celeborn/pull/3485#discussion_r2370830982
##########
client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java:
##########
@@ -307,6 +266,93 @@ public ByteBuf next() throws Exception {
return chunk.getRight();
}
+ private void startAsyncFetch() {
+ CompletableFuture.runAsync(this::fetchChunks, fetchExecutor)
+ .exceptionally(
+ throwable -> {
+ logger.warn("Async fetch failed", throwable);
+ exception.set(new IOException(throwable));
+ return null;
+ });
+ }
+
+ private void fetchChunks() {
+ try {
+ while (!closed && currentChunkIndex <= endChunkIndex) {
+ if (partitionReaderCheckpointMetadata.isPresent()
+ &&
partitionReaderCheckpointMetadata.get().isCheckpointed(currentChunkIndex)) {
+ logger.info(
+ "Skipping chunk {} as it has already been returned,"
+ + " likely by a previous reader for the same partition.",
+ currentChunkIndex);
+ currentChunkIndex++;
Review Comment:
This is not a atomic operation, you will need to change it to thread safe
types.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]