This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a90ac8b6 [CELEBORN-1036] Map task hangs at limitZeroInFlight due to 
duplicate onFailure called
9a90ac8b6 is described below

commit 9a90ac8b6d9d86daaacc5474ce1a6b48262f21c2
Author: onebox-li <[email protected]>
AuthorDate: Fri Oct 13 20:13:32 2023 +0800

    [CELEBORN-1036] Map task hangs at limitZeroInFlight due to duplicate 
onFailure called
    
    ### What changes were proposed in this pull request?
    In our test jobs, we found few map tasks may hang at 
InFlightRequestTracker#limitZeroInFlight (both
     prepareForMergeData and mapEndInternal can occurs) when worker unexpected 
shutdown. We add logs to trace InFlightRequestTracker#totalInflightReqs and 
found this adder may become negative In the above case.
    
    When worker suddenly shutdown, the channel connection raise exception.
    If NioEventLoop.processSelectedKeys is doing read, the exceptionCaught will 
be called. In TransportResponseHandler#exceptionCaught will call 
failOutstandingRequests and each request‘s onFailure callback.
    ```
    WARN [data-client-5-9] TransportChannelHandler: Exception in connection 
from /xx
    java.io.IOException: Connection reset by peer
            at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
            at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
            at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
            at sun.nio.ch.IOUtil.read(IOUtil.java:192)
            at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
            at 
org.apache.celeborn.shaded.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:256)
            at 
org.apache.celeborn.shaded.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
            at 
org.apache.celeborn.shaded.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:357)
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
            at 
org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.lang.Thread.run(Thread.java:745)
    ERROR [data-client-5-9] ShuffleClientImpl: Push data to xx failed for 
shuffle 0 map 11 attempt 0 partition 178 batch 634, remain revive times 5.
    ```
    Next NioEventLoop start to `runAllTasks` in the finally block.If there is 
push write task, PushChannelListener.handleFailure will be called because of 
the closing channel. Here callback.onFailure may have a data race on 
`outstandingPushes`.
    ```
    ERROR [data-client-5-9] ShuffleClientImpl: Push data to xx failed for 
shuffle 0 map 11 attempt 0 partition 178 batch 634, remain revive times 4.
    org.apache.celeborn.common.exception.CelebornIOException: Failed to send 
request PUSH 1264 to /xx: 
org.apache.celeborn.shaded.io.netty.channel.StacklessClosedChannelException, 
channel will be closed
            at 
org.apache.celeborn.common.network.client.TransportClient$PushChannelListener.handleFailure(TransportClient.java:382)
            at 
org.apache.celeborn.common.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:325)
            at 
org.apache.celeborn.common.network.client.TransportClient$PushChannelListener.operationComplete(TransportClient.java:373)
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557)
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:629)
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:118)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:999)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:860)
            at 
org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:877)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
            at 
org.apache.celeborn.shaded.io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:113)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:881)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
            at 
org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:940)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1247)
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
            at 
org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.lang.Thread.run(Thread.java:745)
    Caused by: 
org.apache.celeborn.shaded.io.netty.channel.StacklessClosedChannelException
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object,
 ChannelPromise)(Unknown Source)
    ```
    
    Duplicate callback.onFailure will lead to totalInflightReqs count exception.
    
    Here race will not be too severe and only occur under exception situation. 
So I think synchronize a lock is enough to avoid race.
    
    ### Why are the changes needed?
    Increase robustness.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Manual test.
    
    Closes #1978 from onebox-li/fix-handle-channel-failure.
    
    Authored-by: onebox-li <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../common/network/client/TransportClient.java     |  29 ++----
 .../network/client/TransportResponseHandler.java   | 113 ++++++++++++++++-----
 2 files changed, 96 insertions(+), 46 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
index edf5a45ac..151895607 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
@@ -35,7 +35,6 @@ import io.netty.util.concurrent.GenericFutureListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.celeborn.common.exception.CelebornIOException;
 import org.apache.celeborn.common.network.buffer.NioManagedBuffer;
 import org.apache.celeborn.common.network.protocol.*;
 import org.apache.celeborn.common.network.util.NettyUtils;
@@ -132,8 +131,7 @@ public class TransportClient implements Closeable {
         new StdChannelListener(streamChunkSlice) {
           @Override
           protected void handleFailure(String errorMsg, Throwable cause) {
-            handler.removeFetchRequest(streamChunkSlice);
-            callback.onFailure(chunkIndex, new IOException(errorMsg, cause));
+            handler.handleFetchFailure(streamChunkSlice, errorMsg, cause);
           }
         };
 
@@ -162,7 +160,7 @@ public class TransportClient implements Closeable {
     long requestId = requestId();
     handler.addRpcRequest(requestId, callback);
 
-    RpcChannelListener listener = new RpcChannelListener(requestId, callback);
+    RpcChannelListener listener = new RpcChannelListener(requestId);
     channel
         .writeAndFlush(new RpcRequest(requestId, new 
NioManagedBuffer(message)))
         .addListener(listener);
@@ -204,7 +202,7 @@ public class TransportClient implements Closeable {
     PushRequestInfo info = new PushRequestInfo(dueTime, callback);
     handler.addPushRequest(requestId, info);
     pushData.requestId = requestId;
-    PushChannelListener listener = new PushChannelListener(requestId, 
callback, rpcSendoutCallback);
+    PushChannelListener listener = new PushChannelListener(requestId, 
rpcSendoutCallback);
     ChannelFuture channelFuture = 
channel.writeAndFlush(pushData).addListener(listener);
     info.setChannelFuture(channelFuture);
     return channelFuture;
@@ -222,7 +220,7 @@ public class TransportClient implements Closeable {
     handler.addPushRequest(requestId, info);
     pushMergedData.requestId = requestId;
 
-    PushChannelListener listener = new PushChannelListener(requestId, 
callback);
+    PushChannelListener listener = new PushChannelListener(requestId);
     ChannelFuture channelFuture = 
channel.writeAndFlush(pushMergedData).addListener(listener);
     info.setChannelFuture(channelFuture);
     return channelFuture;
@@ -351,35 +349,29 @@ public class TransportClient implements Closeable {
 
   private class RpcChannelListener extends StdChannelListener {
     final long rpcRequestId;
-    final RpcResponseCallback callback;
 
-    RpcChannelListener(long rpcRequestId, RpcResponseCallback callback) {
+    RpcChannelListener(long rpcRequestId) {
       super("RPC " + rpcRequestId);
       this.rpcRequestId = rpcRequestId;
-      this.callback = callback;
     }
 
     @Override
     protected void handleFailure(String errorMsg, Throwable cause) {
-      handler.removeRpcRequest(rpcRequestId);
-      callback.onFailure(new IOException(errorMsg, cause));
+      handler.handleRpcFailure(rpcRequestId, errorMsg, cause);
     }
   }
 
   private class PushChannelListener extends StdChannelListener {
     final long pushRequestId;
-    final RpcResponseCallback callback;
     Runnable rpcSendOutCallback;
 
-    PushChannelListener(long pushRequestId, RpcResponseCallback callback) {
-      this(pushRequestId, callback, null);
+    PushChannelListener(long pushRequestId) {
+      this(pushRequestId, null);
     }
 
-    PushChannelListener(
-        long pushRequestId, RpcResponseCallback callback, Runnable 
rpcSendOutCallback) {
+    PushChannelListener(long pushRequestId, Runnable rpcSendOutCallback) {
       super("PUSH " + pushRequestId);
       this.pushRequestId = pushRequestId;
-      this.callback = callback;
       this.rpcSendOutCallback = rpcSendOutCallback;
     }
 
@@ -393,8 +385,7 @@ public class TransportClient implements Closeable {
 
     @Override
     protected void handleFailure(String errorMsg, Throwable cause) {
-      handler.removePushRequest(pushRequestId);
-      callback.onFailure(new CelebornIOException(errorMsg, cause));
+      handler.handlePushFailure(pushRequestId, errorMsg, cause);
     }
   }
 }
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
index 09f6fe893..ddace8d87 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
@@ -54,9 +54,9 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
   private final TransportConf conf;
   private final Channel channel;
 
-  private final Map<StreamChunkSlice, FetchRequestInfo> outstandingFetches;
+  private final ConcurrentHashMap<StreamChunkSlice, FetchRequestInfo> 
outstandingFetches;
 
-  private final Map<Long, RpcResponseCallback> outstandingRpcs;
+  private final ConcurrentHashMap<Long, RpcResponseCallback> outstandingRpcs;
   private final ConcurrentHashMap<Long, PushRequestInfo> outstandingPushes;
 
   /** Records the time (in system nanoseconds) that the last fetch or RPC 
request was sent. */
@@ -132,7 +132,7 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
     while (iter.hasNext()) {
       Map.Entry<Long, PushRequestInfo> entry = iter.next();
       if (entry.getValue().dueTime <= currentTime) {
-        PushRequestInfo info = outstandingPushes.remove(entry.getKey());
+        PushRequestInfo info = removePushRequest(entry.getKey());
         if (info != null) {
           if (info.channelFuture != null) {
             info.channelFuture.cancel(true);
@@ -158,7 +158,7 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
     while (iter.hasNext()) {
       Map.Entry<StreamChunkSlice, FetchRequestInfo> entry = iter.next();
       if (entry.getValue().dueTime <= currentTime) {
-        FetchRequestInfo info = outstandingFetches.remove(entry.getKey());
+        FetchRequestInfo info = removeFetchRequest(entry.getKey());
         if (info != null) {
           if (info.channelFuture != null) {
             info.channelFuture.cancel(true);
@@ -186,8 +186,8 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
     outstandingFetches.put(streamChunkSlice, info);
   }
 
-  public void removeFetchRequest(StreamChunkSlice streamChunkSlice) {
-    outstandingFetches.remove(streamChunkSlice);
+  public FetchRequestInfo removeFetchRequest(StreamChunkSlice 
streamChunkSlice) {
+    return outstandingFetches.remove(streamChunkSlice);
   }
 
   public void addRpcRequest(long requestId, RpcResponseCallback callback) {
@@ -198,8 +198,8 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
     outstandingRpcs.put(requestId, callback);
   }
 
-  public void removeRpcRequest(long requestId) {
-    outstandingRpcs.remove(requestId);
+  public RpcResponseCallback removeRpcRequest(long requestId) {
+    return outstandingRpcs.remove(requestId);
   }
 
   public void addPushRequest(long requestId, PushRequestInfo info) {
@@ -210,8 +210,8 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
     outstandingPushes.put(requestId, info);
   }
 
-  public void removePushRequest(long requestId) {
-    outstandingPushes.remove(requestId);
+  public PushRequestInfo removePushRequest(long requestId) {
+    return outstandingPushes.remove(requestId);
   }
 
   /**
@@ -219,32 +219,44 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
    * exception or pre-mature connection termination.
    */
   private void failOutstandingRequests(Throwable cause) {
-    for (Map.Entry<StreamChunkSlice, FetchRequestInfo> entry : 
outstandingFetches.entrySet()) {
+    Iterator<StreamChunkSlice> fetchRequestIter = 
outstandingFetches.keySet().iterator();
+    while (fetchRequestIter.hasNext()) {
       try {
-        entry.getValue().callback.onFailure(entry.getKey().chunkIndex, cause);
+        StreamChunkSlice slice = fetchRequestIter.next();
+        FetchRequestInfo info = removeFetchRequest(slice);
+        if (info != null) {
+          info.callback.onFailure(slice.chunkIndex, cause);
+        }
       } catch (Exception e) {
         logger.warn("ChunkReceivedCallback.onFailure throws exception", e);
       }
     }
-    for (Map.Entry<Long, RpcResponseCallback> entry : 
outstandingRpcs.entrySet()) {
+
+    Iterator<Long> rpcRequestIter = outstandingRpcs.keySet().iterator();
+    while (rpcRequestIter.hasNext()) {
       try {
-        entry.getValue().onFailure(cause);
+        long requestId = rpcRequestIter.next();
+        RpcResponseCallback callback = removeRpcRequest(requestId);
+        if (callback != null) {
+          callback.onFailure(cause);
+        }
       } catch (Exception e) {
         logger.warn("RpcResponseCallback.onFailure throws exception", e);
       }
     }
-    for (Map.Entry<Long, PushRequestInfo> entry : 
outstandingPushes.entrySet()) {
+
+    Iterator<Long> pushRequestIter = outstandingPushes.keySet().iterator();
+    while (pushRequestIter.hasNext()) {
       try {
-        entry.getValue().callback.onFailure(cause);
+        long requestId = pushRequestIter.next();
+        PushRequestInfo info = removePushRequest(requestId);
+        if (info != null) {
+          info.callback.onFailure(cause);
+        }
       } catch (Exception e) {
         logger.warn("RpcResponseCallback.onFailure throws exception", e);
       }
     }
-
-    // It's OK if new fetches appear, as they will fail immediately.
-    outstandingFetches.clear();
-    outstandingRpcs.clear();
-    outstandingPushes.clear();
   }
 
   @Override
@@ -302,7 +314,7 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
     if (message instanceof ChunkFetchSuccess) {
       ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
       logger.debug("Chunk {} fetch succeeded", resp.streamChunkSlice);
-      FetchRequestInfo info = outstandingFetches.remove(resp.streamChunkSlice);
+      FetchRequestInfo info = removeFetchRequest(resp.streamChunkSlice);
       if (info == null) {
         logger.warn(
             "Ignoring response for block {} from {} since it is not 
outstanding",
@@ -320,7 +332,7 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
       ChunkFetchFailure resp = (ChunkFetchFailure) message;
       logger.error(
           "chunk {} fetch failed, errorMessage {}", resp.streamChunkSlice, 
resp.errorString);
-      FetchRequestInfo info = outstandingFetches.remove(resp.streamChunkSlice);
+      FetchRequestInfo info = removeFetchRequest(resp.streamChunkSlice);
       if (info == null) {
         logger.warn(
             "Ignoring response for block {} from {} ({}) since it is not 
outstanding",
@@ -336,9 +348,9 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
       }
     } else if (message instanceof RpcResponse) {
       RpcResponse resp = (RpcResponse) message;
-      PushRequestInfo info = outstandingPushes.remove(resp.requestId);
+      PushRequestInfo info = removePushRequest(resp.requestId);
       if (info == null) {
-        RpcResponseCallback listener = outstandingRpcs.remove(resp.requestId);
+        RpcResponseCallback listener = removeRpcRequest(resp.requestId);
         if (listener == null) {
           logger.warn(
               "Ignoring response for RPC {} from {} ({} bytes) since it is not 
outstanding",
@@ -362,9 +374,9 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
       }
     } else if (message instanceof RpcFailure) {
       RpcFailure resp = (RpcFailure) message;
-      PushRequestInfo info = outstandingPushes.remove(resp.requestId);
+      PushRequestInfo info = removePushRequest(resp.requestId);
       if (info == null) {
-        RpcResponseCallback listener = outstandingRpcs.remove(resp.requestId);
+        RpcResponseCallback listener = removeRpcRequest(resp.requestId);
         if (listener == null) {
           logger.warn(
               "Ignoring response for RPC {} from {} ({}) since it is not 
outstanding",
@@ -396,4 +408,51 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
   public void updateTimeOfLastRequest() {
     timeOfLastRequestNs.set(System.nanoTime());
   }
+
+  public void handleRpcFailure(long rpcRequestId, String errorMsg, Throwable 
cause) {
+    RpcResponseCallback callback = removeRpcRequest(rpcRequestId);
+    if (callback != null) {
+      callback.onFailure(new CelebornIOException(errorMsg, cause));
+    } else {
+      logger.warn(
+          "RpcResponseCallback {} not found/already addressed when listener 
handles rpc request failure",
+          rpcRequestId);
+    }
+  }
+
+  public void handlePushFailure(long pushRequestId, String errorMsg, Throwable 
cause) {
+    PushRequestInfo info = removePushRequest(pushRequestId);
+    if (info != null) {
+      RpcResponseCallback callback = info.callback;
+      if (callback != null) {
+        callback.onFailure(new CelebornIOException(errorMsg, cause));
+      } else {
+        logger.warn(
+            "PushRequestInfo {} callback is null when handle push request 
failure", pushRequestId);
+      }
+    } else {
+      logger.warn(
+          "PushRequestInfo {} not found/already addressed when listener 
handles push request failure",
+          pushRequestId);
+    }
+  }
+
+  public void handleFetchFailure(
+      StreamChunkSlice streamChunkSlice, String errorMsg, Throwable cause) {
+    FetchRequestInfo info = removeFetchRequest(streamChunkSlice);
+    if (info != null) {
+      ChunkReceivedCallback callback = info.callback;
+      if (callback != null) {
+        callback.onFailure(streamChunkSlice.chunkIndex, new 
IOException(errorMsg, cause));
+      } else {
+        logger.warn(
+            "FetchRequestInfo ({}) callback is null when listener handles 
fetch request failure",
+            streamChunkSlice);
+      }
+    } else {
+      logger.warn(
+          "FetchRequestInfo ({}) not found/already addressed when listener 
handles fetch request failure",
+          streamChunkSlice);
+    }
+  }
 }

Reply via email to