This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.4 by this push:
new db148be32 [CELEBORN-1510] Partial task unable to switch to the replica
db148be32 is described below
commit db148be32fd9c0675cc117fc1e3c8120757aa780
Author: zhangzhao.08 <[email protected]>
AuthorDate: Tue Dec 17 15:13:14 2024 +0800
[CELEBORN-1510] Partial task unable to switch to the replica
Attempt%2 = 1 unable to switch to the replica
When Attempt equals 1, createReader will access location.peer. Special
circumstances can lead to unexpected behaviors. For example, an exception
occurs during the process of obtaining data and the peer needs to be used.
However, this logic will switch to the abnormal node again.
<img width="1626" alt="image"
src="https://github.com/user-attachments/assets/21c50953-db0f-4717-9b91-e3aeae16ece2">
<img width="1652" alt="image"
src="https://github.com/user-attachments/assets/7430786c-26a4-4b3b-be68-f8bdf780c58c">
NO
Internal test.
Closes #2626 from zhaostu4/switch_replica.
Authored-by: zhangzhao.08 <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit b24f8677844279a1882ad946e05e5483baec1aa3)
Signed-off-by: Shuang <[email protected]>
---
.../org/apache/celeborn/client/read/CelebornInputStream.java | 11 +++++------
1 file changed, 5 insertions(+), 6 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index 4c517eebc..a0651219a 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -332,6 +332,11 @@ public abstract class CelebornInputStream extends
InputStream {
}
private PartitionReader createReaderWithRetry(PartitionLocation location)
throws IOException {
+ // For the first time, the location will be selected according to
attemptNumber
+ if (fetchChunkRetryCnt == 0 && attemptNumber % 2 == 1 &&
location.hasPeer()) {
+ location = location.getPeer();
+ logger.debug("Read peer {} for attempt {}.", location, attemptNumber);
+ }
Exception lastException = null;
while (fetchChunkRetryCnt < fetchChunkMaxRetry) {
try {
@@ -423,12 +428,6 @@ public abstract class CelebornInputStream extends
InputStream {
private PartitionReader createReader(
PartitionLocation location, int fetchChunkRetryCnt, int
fetchChunkMaxRetry)
throws IOException, InterruptedException {
- if (!location.hasPeer()) {
- logger.debug("Partition {} has only one partition replica.", location);
- } else if (attemptNumber % 2 == 1) {
- location = location.getPeer();
- logger.debug("Read peer {} for attempt {}.", location, attemptNumber);
- }
logger.debug("Create reader for location {}", location);
StorageInfo storageInfo = location.getStorageInfo();