This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 96eab1ce [#133] feat(netty): integration-test supports netty. (#1008)
96eab1ce is described below

commit 96eab1cea9ecbcc308f23c2dad8af80e14928269
Author: Xianming Lei <[email protected]>
AuthorDate: Tue Jul 25 11:27:14 2023 +0800

    [#133] feat(netty): integration-test supports netty. (#1008)
    
    ### What changes were proposed in this pull request?
    
    For #133
    
    ### Why are the changes needed?
    Make integration-test support netty.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    
    UTs.
    
    Co-authored-by: leixianming <[email protected]>
---
 .../apache/spark/shuffle/writer/WriteBufferManager.java    | 10 +++++++++-
 .../java/org/apache/uniffle/common/ShuffleBlockInfo.java   |  6 ++++++
 .../java/org/apache/uniffle/common/config/RssConf.java     |  6 ++++++
 .../org/apache/uniffle/common/netty/MessageEncoder.java    |  1 +
 .../uniffle/common/netty/client/TransportClient.java       |  2 +-
 .../org/apache/uniffle/common/netty/protocol/Encoders.java |  6 +++---
 .../org/apache/uniffle/common/netty/protocol/Message.java  |  2 ++
 .../uniffle/common/netty/protocol/NettyProtocolTest.java   |  8 ++++----
 .../java/org/apache/uniffle/test/CoordinatorGrpcTest.java  |  2 ++
 .../java/org/apache/uniffle/test/IntegrationTestBase.java  |  7 +++++++
 .../uniffle/test/ShuffleUnregisterWithHadoopTest.java      |  3 +--
 .../uniffle/test/ShuffleUnregisterWithLocalfileTest.java   |  3 +--
 .../org/apache/uniffle/test/SparkIntegrationTestBase.java  | 14 +++++++++++++-
 .../test/ContinuousSelectPartitionStrategyTest.java        |  2 ++
 .../uniffle/test/GetShuffleReportForMultiPartTest.java     |  2 ++
 .../uniffle/client/factory/CoordinatorClientFactory.java   |  2 +-
 .../uniffle/server/netty/ShuffleServerNettyHandler.java    | 10 +++++-----
 17 files changed, 66 insertions(+), 20 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index 49795555..b6299cd7 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -426,9 +426,17 @@ public class WriteBufferManager extends MemoryConsumer {
               + " bytes");
       // Use final temporary variables for closures
       final long _memoryUsed = memoryUsed;
+      final List<ShuffleBlockInfo> finalShuffleBlockInfosPerEvent = 
shuffleBlockInfoList;
       events.add(
           new AddBlockEvent(
-              taskId, shuffleBlockInfosPerEvent, () -> 
freeAllocatedMemory(_memoryUsed)));
+              taskId,
+              shuffleBlockInfosPerEvent,
+              () -> {
+                freeAllocatedMemory(_memoryUsed);
+                for (ShuffleBlockInfo shuffleBlockInfo : 
finalShuffleBlockInfosPerEvent) {
+                  shuffleBlockInfo.getData().release();
+                }
+              }));
     }
     return events;
   }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/ShuffleBlockInfo.java 
b/common/src/main/java/org/apache/uniffle/common/ShuffleBlockInfo.java
index fe0df6d1..8de75d90 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShuffleBlockInfo.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleBlockInfo.java
@@ -22,6 +22,8 @@ import java.util.List;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
+import org.apache.uniffle.common.util.ByteBufUtils;
+
 public class ShuffleBlockInfo {
 
   private int partitionId;
@@ -150,4 +152,8 @@ public class ShuffleBlockInfo {
 
     return sb.toString();
   }
+
+  public synchronized void copyDataTo(ByteBuf to) {
+    ByteBufUtils.copyByteBuf(data, to);
+  }
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
index 73e787b9..805dc30a 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 
 import org.apache.uniffle.common.util.UnitConverter;
@@ -665,4 +666,9 @@ public class RssConf implements Cloneable {
   public String getEnv(String key) {
     return System.getenv(key);
   }
+
+  @VisibleForTesting
+  public void remove(String key) {
+    this.settings.remove(key);
+  }
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/netty/MessageEncoder.java 
b/common/src/main/java/org/apache/uniffle/common/netty/MessageEncoder.java
index c417cedc..5221cc42 100644
--- a/common/src/main/java/org/apache/uniffle/common/netty/MessageEncoder.java
+++ b/common/src/main/java/org/apache/uniffle/common/netty/MessageEncoder.java
@@ -55,6 +55,7 @@ public class MessageEncoder extends 
ChannelOutboundHandlerAdapter {
     } catch (Exception e) {
       LOG.error("Unexpected exception during process encode!", e);
       byteBuf.release();
+      throw e;
     }
     ctx.writeAndFlush(byteBuf);
     // do transferTo send data after encode buffer send.
diff --git 
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClient.java
 
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClient.java
index 674eb4db..acd79a60 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClient.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClient.java
@@ -69,7 +69,7 @@ public class TransportClient implements Closeable {
     if (logger.isTraceEnabled()) {
       logger.trace("Pushing data to {}", NettyUtils.getRemoteAddress(channel));
     }
-    long requestId = requestId();
+    long requestId = message.getRequestId();
     handler.addResponseCallback(requestId, callback);
     RpcChannelListener listener = new RpcChannelListener(requestId, callback);
     return channel.writeAndFlush(message).addListener(listener);
diff --git 
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/Encoders.java 
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/Encoders.java
index 7024ef85..b74a517b 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/Encoders.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/Encoders.java
@@ -43,8 +43,7 @@ public class Encoders {
     byteBuf.writeLong(shuffleBlockInfo.getCrc());
     byteBuf.writeLong(shuffleBlockInfo.getTaskAttemptId());
     // todo: avoid copy
-    ByteBufUtils.copyByteBuf(shuffleBlockInfo.getData(), byteBuf);
-    shuffleBlockInfo.getData().release();
+    shuffleBlockInfo.copyDataTo(byteBuf);
     List<ShuffleServerInfo> shuffleServerInfoList = 
shuffleBlockInfo.getShuffleServerInfos();
     byteBuf.writeInt(shuffleServerInfoList.size());
     for (ShuffleServerInfo shuffleServerInfo : shuffleServerInfoList) {
@@ -64,7 +63,8 @@ public class Encoders {
     int encodeLength =
         4 * Long.BYTES
             + 4 * Integer.BYTES
-            + ByteBufUtils.encodedLength(shuffleBlockInfo.getData())
+            + Integer.BYTES
+            + shuffleBlockInfo.getLength()
             + Integer.BYTES;
     for (ShuffleServerInfo shuffleServerInfo : 
shuffleBlockInfo.getShuffleServerInfos()) {
       encodeLength += encodeLengthOfShuffleServerInfo(shuffleServerInfo);
diff --git 
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/Message.java 
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/Message.java
index b0a3da1f..c019099f 100644
--- a/common/src/main/java/org/apache/uniffle/common/netty/protocol/Message.java
+++ b/common/src/main/java/org/apache/uniffle/common/netty/protocol/Message.java
@@ -131,4 +131,6 @@ public abstract class Message implements Encodable {
         throw new IllegalArgumentException("Unexpected message type: " + 
msgType);
     }
   }
+
+  public abstract long getRequestId();
 }
diff --git 
a/common/src/test/java/org/apache/uniffle/common/netty/protocol/NettyProtocolTest.java
 
b/common/src/test/java/org/apache/uniffle/common/netty/protocol/NettyProtocolTest.java
index 09473dd2..48ff31ff 100644
--- 
a/common/src/test/java/org/apache/uniffle/common/netty/protocol/NettyProtocolTest.java
+++ 
b/common/src/test/java/org/apache/uniffle/common/netty/protocol/NettyProtocolTest.java
@@ -50,7 +50,7 @@ public class NettyProtocolTest {
                 1,
                 1,
                 1,
-                10,
+                data.length,
                 123,
                 Unpooled.wrappedBuffer(data).retain(),
                 shuffleServerInfoList,
@@ -61,7 +61,7 @@ public class NettyProtocolTest {
                 1,
                 1,
                 1,
-                10,
+                data.length,
                 123,
                 Unpooled.wrappedBuffer(data).retain(),
                 shuffleServerInfoList,
@@ -74,7 +74,7 @@ public class NettyProtocolTest {
                 1,
                 2,
                 1,
-                10,
+                data.length,
                 123,
                 Unpooled.wrappedBuffer(data).retain(),
                 shuffleServerInfoList,
@@ -85,7 +85,7 @@ public class NettyProtocolTest {
                 1,
                 1,
                 2,
-                10,
+                data.length,
                 123,
                 Unpooled.wrappedBuffer(data).retain(),
                 shuffleServerInfoList,
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
index f252abd0..88a04190 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
@@ -73,6 +73,7 @@ public class CoordinatorGrpcTest extends CoordinatorTestBase {
     coordinatorConf.setLong("rss.coordinator.server.heartbeat.timeout", 3000);
     createCoordinatorServer(coordinatorConf);
     ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+    shuffleServerConf.remove(ShuffleServerConf.NETTY_SERVER_PORT.key());
     createShuffleServer(shuffleServerConf);
     shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 
1);
     shuffleServerConf.setInteger("rss.jetty.http.port", 18081);
@@ -155,6 +156,7 @@ public class CoordinatorGrpcTest extends 
CoordinatorTestBase {
     withEnvironmentVariables("RSS_ENV_KEY", storageTypeJsonSource)
         .execute(
             () -> {
+              
shuffleServerConf.remove(ShuffleServerConf.NETTY_SERVER_PORT.key());
               ShuffleServer ss = new ShuffleServer((ShuffleServerConf) 
shuffleServerConf);
               ss.start();
               shuffleServers.set(0, ss);
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
index aa1f311b..e73ccfa7 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
@@ -23,6 +23,7 @@ import java.io.PrintWriter;
 import java.nio.file.Files;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Lists;
 import org.junit.jupiter.api.AfterAll;
@@ -60,6 +61,9 @@ public abstract class IntegrationTestBase extends 
HadoopTestBase {
   protected static List<ShuffleServer> shuffleServers = Lists.newArrayList();
   protected static List<CoordinatorServer> coordinators = Lists.newArrayList();
 
+  protected static final int NETTY_PORT = 21000;
+  protected static AtomicInteger nettyPortCounter = new AtomicInteger();
+
   public static void startServers() throws Exception {
     for (CoordinatorServer coordinator : coordinators) {
       coordinator.start();
@@ -123,6 +127,9 @@ public abstract class IntegrationTestBase extends 
HadoopTestBase {
     serverConf.setBoolean("rss.server.health.check.enable", false);
     serverConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
     serverConf.set(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL, 
500L);
+    serverConf.setInteger(
+        ShuffleServerConf.NETTY_SERVER_PORT, NETTY_PORT + 
nettyPortCounter.getAndIncrement());
+    serverConf.setString("rss.server.tags", "GRPC,GRPC_NETTY");
     return serverConf;
   }
 
diff --git 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHadoopTest.java
 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHadoopTest.java
index a824a645..12299ea3 100644
--- 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHadoopTest.java
+++ 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHadoopTest.java
@@ -99,9 +99,8 @@ public class ShuffleUnregisterWithHadoopTest extends 
SparkIntegrationTestBase {
       map = javaPairRDD.collectAsMap();
       shufflePath = appPath + "/1";
       assertTrue(fs.exists(new Path(shufflePath)));
-    } else {
-      runCounter++;
     }
+    runCounter++;
     return map;
   }
 }
diff --git 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java
 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java
index 814e0f4c..e68cc74a 100644
--- 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java
+++ 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java
@@ -105,9 +105,8 @@ public class ShuffleUnregisterWithLocalfileTest extends 
SparkIntegrationTestBase
       map = javaPairRDD.collectAsMap();
       shufflePath = appPath + "/1";
       assertTrue(new File(shufflePath).exists());
-    } else {
-      runCounter++;
     }
+    runCounter++;
     return map;
   }
 }
diff --git 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
index de87ac8f..ac37f03c 100644
--- 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
+++ 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
@@ -58,15 +58,23 @@ public abstract class SparkIntegrationTestBase extends 
IntegrationTestBase {
     updateSparkConfCustomer(sparkConf);
     start = System.currentTimeMillis();
     Map resultWithRss = runSparkApp(sparkConf, fileName);
-    long durationWithRss = System.currentTimeMillis() - start;
+    final long durationWithRss = System.currentTimeMillis() - start;
 
+    updateSparkConfWithRssNetty(sparkConf);
+    start = System.currentTimeMillis();
+    Map resultWithRssNetty = runSparkApp(sparkConf, fileName);
+    final long durationWithRssNetty = System.currentTimeMillis() - start;
     verifyTestResult(resultWithoutRss, resultWithRss);
+    verifyTestResult(resultWithoutRss, resultWithRssNetty);
 
     LOG.info(
         "Test: durationWithoutRss["
             + durationWithoutRss
             + "], durationWithRss["
             + durationWithRss
+            + "]"
+            + "], durationWithRssNetty["
+            + durationWithRssNetty
             + "]");
   }
 
@@ -110,6 +118,10 @@ public abstract class SparkIntegrationTestBase extends 
IntegrationTestBase {
     sparkConf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE.key(), "true");
   }
 
+  public void updateSparkConfWithRssNetty(SparkConf sparkConf) {
+    sparkConf.set(RssSparkConfig.RSS_CLIENT_TYPE, "GRPC_NETTY");
+  }
+
   protected void verifyTestResult(Map expected, Map actual) {
     assertEquals(expected.size(), actual.size());
     for (Object expectedKey : expected.keySet()) {
diff --git 
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java
 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java
index 410e95e1..f15d26bf 100644
--- 
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java
+++ 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java
@@ -79,6 +79,7 @@ public class ContinuousSelectPartitionStrategyTest extends 
SparkIntegrationTestB
       ShuffleServerConf serverConf = new ShuffleServerConf();
       dataFolder.deleteOnExit();
       serverConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + i);
+      serverConf.setInteger("rss.server.netty.port", NETTY_PORT + i);
       serverConf.setString("rss.storage.type", 
StorageType.MEMORY_LOCALFILE_HDFS.name());
       serverConf.setString("rss.storage.basePath", 
dataFolder.getAbsolutePath());
       serverConf.setString("rss.server.buffer.capacity", 
String.valueOf(671088640 - i));
@@ -94,6 +95,7 @@ public class ContinuousSelectPartitionStrategyTest extends 
SparkIntegrationTestB
       serverConf.setString("rss.server.hadoop.dfs.replication", "2");
       serverConf.setLong("rss.server.disk.capacity", 10L * 1024L * 1024L * 
1024L);
       serverConf.setBoolean("rss.server.health.check.enable", false);
+      serverConf.setString("rss.server.tags", "GRPC,GRPC_NETTY");
       createMockedShuffleServer(serverConf);
     }
     enableRecordGetShuffleResult();
diff --git 
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
index 9d4c98b8..f7944ceb 100644
--- 
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
+++ 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
@@ -83,6 +83,7 @@ public class GetShuffleReportForMultiPartTest extends 
SparkIntegrationTestBase {
       ShuffleServerConf serverConf = new ShuffleServerConf();
       dataFolder.deleteOnExit();
       serverConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + i);
+      serverConf.setInteger("rss.server.netty.port", NETTY_PORT + i);
       serverConf.setString("rss.storage.type", 
StorageType.MEMORY_LOCALFILE_HDFS.name());
       serverConf.setString("rss.storage.basePath", 
dataFolder.getAbsolutePath());
       serverConf.setString("rss.server.buffer.capacity", "671088640");
@@ -98,6 +99,7 @@ public class GetShuffleReportForMultiPartTest extends 
SparkIntegrationTestBase {
       serverConf.setString("rss.server.hadoop.dfs.replication", "2");
       serverConf.setLong("rss.server.disk.capacity", 10L * 1024L * 1024L * 
1024L);
       serverConf.setBoolean("rss.server.health.check.enable", false);
+      serverConf.setString("rss.server.tags", "GRPC,GRPC_NETTY");
       createMockedShuffleServer(serverConf);
     }
     enableRecordGetShuffleResult();
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
index 2ba77d8d..b1744dce 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
@@ -40,7 +40,7 @@ public class CoordinatorClientFactory {
   }
 
   public CoordinatorClient createCoordinatorClient(String host, int port) {
-    if (clientType.equals(ClientType.GRPC)) {
+    if (clientType.equals(ClientType.GRPC) || 
clientType.equals(ClientType.GRPC_NETTY)) {
       return new CoordinatorGrpcClient(host, port);
     } else {
       throw new UnsupportedOperationException("Unsupported client type " + 
clientType);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index 5c3d2f1a..6abbd16e 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -133,7 +133,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
         responseMessage = errorMsg;
         rpcResponse =
             new RpcResponse(req.getRequestId(), StatusCode.INTERNAL_ERROR, 
responseMessage);
-        client.sendRpcSync(rpcResponse, RPC_TIMEOUT);
+        client.getChannel().writeAndFlush(rpcResponse);
         return;
       }
       final long start = System.currentTimeMillis();
@@ -209,7 +209,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
           new RpcResponse(req.getRequestId(), StatusCode.INTERNAL_ERROR, "No 
data in request");
     }
 
-    client.sendRpcSync(rpcResponse, RPC_TIMEOUT);
+    client.getChannel().writeAndFlush(rpcResponse);
   }
 
   public void handleGetMemoryShuffleDataRequest(
@@ -292,7 +292,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
           new GetMemoryShuffleDataResponse(
               req.getRequestId(), status, msg, Lists.newArrayList(), 
Unpooled.EMPTY_BUFFER);
     }
-    client.sendRpcSync(response, RPC_TIMEOUT);
+    client.getChannel().writeAndFlush(response);
   }
 
   public void handleGetLocalShuffleIndexRequest(
@@ -374,7 +374,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
           new GetLocalShuffleIndexResponse(
               req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER, 0L);
     }
-    client.sendRpcSync(response, RPC_TIMEOUT);
+    client.getChannel().writeAndFlush(response);
   }
 
   public void handleGetLocalShuffleData(TransportClient client, 
GetLocalShuffleDataRequest req) {
@@ -471,7 +471,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
           new GetLocalShuffleDataResponse(
               req.getRequestId(), status, msg, new 
NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
     }
-    client.sendRpcSync(response, RPC_TIMEOUT);
+    client.getChannel().writeAndFlush(response);
   }
 
   private List<ShufflePartitionedData> 
toPartitionedData(SendShuffleDataRequest req) {

Reply via email to