This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ae9e2476 [CELEBORN-878][FLINK] Convert all IOException to 
PartitionUnRetryAbleException when openStream/read file
4ae9e2476 is described below

commit 4ae9e2476fd3a8c81c5b4956cf615c3163d468e8
Author: Shuang <[email protected]>
AuthorDate: Tue Aug 8 16:15:51 2023 +0800

    [CELEBORN-878][FLINK] Convert all IOException to 
PartitionUnRetryAbleException when openStream/read file
    
    ### What changes were proposed in this pull request?
    1. Wrap IOException to PartitionUnRetryAbleException when fetch
    2. Improve message logging when open stream/read data error
    
    ### Why are the changes needed?
    When open stream, there would be encounter many different IOExceptions such 
as NoSuchFileException, FileNotFoundException,FileCorruptedException etc, for 
these checked exception should wrap to PartitionUnRetryAbleException to let 
client choose to regenerate the data.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT & Manual test
    
    Closes #1796 from RexXiong/CELEBORN-878-IO-Exception.
    
    Authored-by: Shuang <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../plugin/flink/RemoteBufferStreamReader.java     |  7 ++++++
 .../flink/readclient/CelebornBufferStream.java     | 10 +++++++++
 .../celeborn/common/util/ExceptionUtils.java       |  9 ++------
 .../worker/storage/MapDataPartitionReader.java     | 11 +++++++---
 .../service/deploy/worker/FetchHandler.scala       | 25 ++++++++++++++--------
 5 files changed, 43 insertions(+), 19 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteBufferStreamReader.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteBufferStreamReader.java
index e4ee376bc..fcb85b571 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteBufferStreamReader.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteBufferStreamReader.java
@@ -27,6 +27,7 @@ import 
org.apache.celeborn.common.network.protocol.BacklogAnnouncement;
 import org.apache.celeborn.common.network.protocol.ReadAddCredit;
 import org.apache.celeborn.common.network.protocol.RequestMessage;
 import org.apache.celeborn.common.network.protocol.TransportableError;
+import org.apache.celeborn.common.network.util.NettyUtils;
 import org.apache.celeborn.plugin.flink.buffer.CreditListener;
 import org.apache.celeborn.plugin.flink.buffer.TransferBufferPool;
 import org.apache.celeborn.plugin.flink.protocol.ReadData;
@@ -128,6 +129,12 @@ public class RemoteBufferStreamReader extends 
CreditListener {
   public void errorReceived(String errorMsg) {
     if (!closed) {
       closed = true;
+      if (bufferStream != null && bufferStream.getClient() != null) {
+        logger.error(
+            "Received error from {} message {}",
+            NettyUtils.getRemoteAddress(bufferStream.getClient().getChannel()),
+            errorMsg);
+      }
       failureListener.accept(new IOException(errorMsg));
     }
   }
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/CelebornBufferStream.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/CelebornBufferStream.java
index 5b0bcbe2c..415f3b2bb 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/CelebornBufferStream.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/CelebornBufferStream.java
@@ -30,6 +30,7 @@ import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.network.client.RpcResponseCallback;
 import org.apache.celeborn.common.network.client.TransportClient;
 import org.apache.celeborn.common.network.protocol.*;
+import org.apache.celeborn.common.network.util.NettyUtils;
 import org.apache.celeborn.common.protocol.PartitionLocation;
 import org.apache.celeborn.plugin.flink.network.FlinkTransportClientFactory;
 
@@ -112,6 +113,11 @@ public class CelebornBufferStream {
 
           @Override
           public void onFailure(Throwable e) {
+            logger.error(
+                "Open file {} stream for {} error from {}",
+                fileName,
+                shuffleKey,
+                NettyUtils.getRemoteAddress(client.getChannel()));
             messageConsumer.accept(new TransportableError(streamId, e));
           }
         });
@@ -177,4 +183,8 @@ public class CelebornBufferStream {
       isClosed = true;
     }
   }
+
+  public TransportClient getClient() {
+    return client;
+  }
 }
diff --git 
a/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java 
b/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
index 5b7b0a228..ae6c0422a 100644
--- a/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
+++ b/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
@@ -17,13 +17,11 @@
 
 package org.apache.celeborn.common.util;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 
 import org.apache.celeborn.common.exception.CelebornIOException;
-import org.apache.celeborn.common.exception.FileCorruptedException;
 import org.apache.celeborn.common.exception.PartitionUnRetryAbleException;
 
 public class ExceptionUtils {
@@ -38,11 +36,8 @@ public class ExceptionUtils {
     }
   }
 
-  public static Throwable wrapIOExceptionToUnRetryable(
-      Throwable throwable, boolean convertAllIOException2UnRetryable) {
-    if (throwable instanceof FileNotFoundException || throwable instanceof 
FileCorruptedException) {
-      return new PartitionUnRetryAbleException(throwable.getMessage(), 
throwable);
-    } else if (throwable instanceof IOException && 
convertAllIOException2UnRetryable) {
+  public static Throwable wrapIOExceptionToUnRetryable(Throwable throwable) {
+    if (throwable instanceof IOException) {
       return new PartitionUnRetryAbleException(throwable.getMessage(), 
throwable);
     } else {
       return throwable;
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
index d9fbdb5a3..4b0438460 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
@@ -39,6 +39,7 @@ import org.apache.celeborn.common.meta.FileInfo;
 import org.apache.celeborn.common.network.protocol.BacklogAnnouncement;
 import org.apache.celeborn.common.network.protocol.ReadData;
 import org.apache.celeborn.common.network.protocol.TransportableError;
+import org.apache.celeborn.common.network.util.NettyUtils;
 import org.apache.celeborn.common.util.ExceptionUtils;
 import org.apache.celeborn.common.util.Utils;
 import org.apache.celeborn.service.deploy.worker.memory.BufferQueue;
@@ -371,7 +372,12 @@ public class MapDataPartitionReader implements 
Comparable<MapDataPartitionReader
   }
 
   private void notifyError(Throwable throwable) {
-    logger.error("read error stream id {} message:{}", streamId, 
throwable.getMessage(), throwable);
+    logger.error(
+        "Read file: {} error from {}, stream id {}",
+        fileInfo.getFilePath(),
+        NettyUtils.getRemoteAddress(this.associatedChannel),
+        streamId,
+        throwable);
     if (throwable instanceof ClosedChannelException) {
       return;
     }
@@ -381,8 +387,7 @@ public class MapDataPartitionReader implements 
Comparable<MapDataPartitionReader
       // And do not close channel because multiple streams are using the very 
same channel.
       // wrapIOException to PartitionUnRetryAbleException, client may choose 
regenerate the data.
       this.associatedChannel.writeAndFlush(
-          new TransportableError(
-              streamId, ExceptionUtils.wrapIOExceptionToUnRetryable(throwable, 
true)));
+          new TransportableError(streamId, 
ExceptionUtils.wrapIOExceptionToUnRetryable(throwable)));
     }
   }
 
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index 45e700695..624c44655 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -95,7 +95,8 @@ class FetchHandler(val conf: CelebornConf, val transportConf: 
TransportConf)
         handleChunkFetchRequest(client, r)
       case r: RpcRequest =>
         // process PbOpenStream RPC
-        var timerShuffleKey: String = null
+        var streamShuffleKey: String = null
+        var streamFileName: String = null
         try {
           val pbMsg = TransportMessage.fromByteBuffer(r.body().nioByteBuffer())
           val pbOpenStream = pbMsg.getParsedPayload[PbOpenStream]
@@ -106,9 +107,9 @@ class FetchHandler(val conf: CelebornConf, val 
transportConf: TransportConf)
               pbOpenStream.getStartIndex,
               pbOpenStream.getEndIndex,
               pbOpenStream.getInitialCredit)
-
-          timerShuffleKey = shuffleKey
-          workerSource.startTimer(WorkerSource.OPEN_STREAM_TIME, 
timerShuffleKey)
+          streamShuffleKey = shuffleKey
+          streamFileName = fileName
+          workerSource.startTimer(WorkerSource.OPEN_STREAM_TIME, 
streamShuffleKey)
           handleOpenStreamInternal(
             client,
             shuffleKey,
@@ -136,7 +137,8 @@ class FetchHandler(val conf: CelebornConf, val 
transportConf: TransportConf)
                     new String(openStreamWithCredit.shuffleKey, 
StandardCharsets.UTF_8),
                     new String(openStreamWithCredit.fileName, 
StandardCharsets.UTF_8))
                 }
-              timerShuffleKey = shuffleKey
+              streamShuffleKey = shuffleKey
+              streamFileName = fileName
               var startIndex = 0
               var endIndex = 0
               var initialCredit = 0
@@ -161,11 +163,11 @@ class FetchHandler(val conf: CelebornConf, val 
transportConf: TransportConf)
                 true)
             } catch {
               case e: IOException =>
-                handleRpcIOException(client, r.requestId, e)
+                handleRpcIOException(client, r.requestId, streamShuffleKey, 
streamFileName, e)
             }
         } finally {
           r.body().release()
-          workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, 
timerShuffleKey)
+          workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, 
streamShuffleKey)
         }
       case unknown: RequestMessage =>
         throw new IllegalArgumentException(s"Unknown message type id: 
${unknown.`type`.id}")
@@ -234,7 +236,7 @@ class FetchHandler(val conf: CelebornConf, val 
transportConf: TransportConf)
       }
     } catch {
       case e: IOException =>
-        handleRpcIOException(client, request.requestId, e)
+        handleRpcIOException(client, request.requestId, shuffleKey, fileName, 
e)
     }
   }
 
@@ -261,13 +263,18 @@ class FetchHandler(val conf: CelebornConf, val 
transportConf: TransportConf)
   private def handleRpcIOException(
       client: TransportClient,
       requestId: Long,
+      shuffleKey: String,
+      fileName: String,
       ioe: IOException): Unit = {
     // if open stream rpc failed, this IOException actually should be 
FileNotFoundException
     // we wrapper this IOException(Other place may have other exception like 
FileCorruptException) unify to
     // PartitionUnRetryableException for reader can give up this partition and 
choose to regenerate the partition data
+    logError(
+      s"Read file: $fileName with shuffleKey: $shuffleKey error from 
${NettyUtils.getRemoteAddress(client.getChannel)}",
+      ioe)
     client.getChannel.writeAndFlush(new RpcFailure(
       requestId,
-      
Throwables.getStackTraceAsString(ExceptionUtils.wrapIOExceptionToUnRetryable(ioe,
 false))))
+      
Throwables.getStackTraceAsString(ExceptionUtils.wrapIOExceptionToUnRetryable(ioe))))
   }
 
   def handleEndStreamFromClient(req: BufferStreamEnd): Unit = {

Reply via email to