This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.2
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.2 by this push:
     new a63e4ca1 [CELEBORN-212] refresh client if current client is inactive. 
(#1159)
a63e4ca1 is described below

commit a63e4ca168773730b04b4c860da43a818decfbd2
Author: Shuang <[email protected]>
AuthorDate: Wed Jan 11 11:54:50 2023 +0800

    [CELEBORN-212] refresh client if current client is inactive. (#1159)
---
 .../client/read/WorkerPartitionReader.java         | 23 ++++++++++++----
 .../celeborn/common/util/ExceptionUtils.java       | 31 ++++++++++++++++++++++
 2 files changed, 49 insertions(+), 5 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
 
b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
index 9d154f58..2a9bae7b 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
@@ -38,11 +38,12 @@ import org.apache.celeborn.common.network.protocol.Message;
 import org.apache.celeborn.common.network.protocol.OpenStream;
 import org.apache.celeborn.common.network.protocol.StreamHandle;
 import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.ExceptionUtils;
 
 public class WorkerPartitionReader implements PartitionReader {
   private final Logger logger = 
LoggerFactory.getLogger(WorkerPartitionReader.class);
   private PartitionLocation location;
-  private final TransportClient client;
+  private final TransportClientFactory clientFactory;
   private StreamHandle streamHandle;
 
   private int returnedChunks;
@@ -94,6 +95,7 @@ public class WorkerPartitionReader implements PartitionReader 
{
             exception.set(new IOException(errorMsg, e));
           }
         };
+    TransportClient client;
     try {
       client = clientFactory.createClient(location.getHost(), 
location.getFetchPort());
     } catch (InterruptedException ie) {
@@ -106,7 +108,7 @@ public class WorkerPartitionReader implements 
PartitionReader {
     streamHandle = (StreamHandle) Message.decode(response);
 
     this.location = location;
-
+    this.clientFactory = clientFactory;
     this.fetchChunkRetryCnt = fetchChunkRetryCnt;
     this.fetchChunkMaxRetry = fetchChunkMaxRetry;
     testFetch = conf.testFetchFailure();
@@ -152,7 +154,7 @@ public class WorkerPartitionReader implements 
PartitionReader {
     return location;
   }
 
-  private void fetchChunks() {
+  private void fetchChunks() throws IOException {
     final int inFlight = chunkIndex - returnedChunks;
     if (inFlight < fetchMaxReqsInFlight) {
       final int toFetch =
@@ -161,8 +163,19 @@ public class WorkerPartitionReader implements 
PartitionReader {
         if (testFetch && fetchChunkRetryCnt < fetchChunkMaxRetry - 1 && 
chunkIndex == 3) {
           callback.onFailure(chunkIndex, new IOException("Test fetch chunk 
failure"));
         } else {
-          client.fetchChunk(streamHandle.streamId, chunkIndex, callback);
-          chunkIndex++;
+          try {
+            TransportClient client =
+                clientFactory.createClient(location.getHost(), 
location.getFetchPort());
+            client.fetchChunk(streamHandle.streamId, chunkIndex, callback);
+            chunkIndex++;
+          } catch (IOException | InterruptedException e) {
+            logger.error(
+                "fetchChunk for streamId: {}, chunkIndex: {} failed.",
+                streamHandle.streamId,
+                chunkIndex,
+                e);
+            ExceptionUtils.wrapAndThrowIOException(e);
+          }
         }
       }
     }
diff --git 
a/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java 
b/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
new file mode 100644
index 00000000..5336f6d1
--- /dev/null
+++ b/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.util;
+
+import java.io.IOException;
+
+public class ExceptionUtils {
+
+  public static void wrapAndThrowIOException(Exception exception) throws 
IOException {
+    if (exception instanceof IOException) {
+      throw (IOException) exception;
+    } else {
+      throw new IOException(exception.getMessage(), exception);
+    }
+  }
+}

Reply via email to