[
https://issues.apache.org/jira/browse/SPARK-13652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15179186#comment-15179186
]
huangyu commented on SPARK-13652:
---------------------------------
I use your patch and it worked, great!!
> TransportClient.sendRpcSync returns wrong results
> -------------------------------------------------
>
> Key: SPARK-13652
> URL: https://issues.apache.org/jira/browse/SPARK-13652
> Project: Spark
> Issue Type: Bug
> Affects Versions: 1.6.0
> Reporter: huangyu
> Attachments: RankHandler.java, Test.java
>
>
> TransportClient is not thread safe and if it is called from multiple threads,
> the messages can't be encoded and decoded correctly. Below is my code,and it
> will print wrong message.
> {code}
> public static void main(String[] args) throws IOException,
> InterruptedException {
> TransportServer server = new TransportContext(new
> TransportConf("test",
> new MapConfigProvider(new HashMap<String, String>())), new
> RankHandler()).
> createServer(8081, new
> LinkedList<TransportServerBootstrap>());
> TransportContext context = new TransportContext(new
> TransportConf("test",
> new MapConfigProvider(new HashMap<String, String>())), new
> NoOpRpcHandler(), true);
> final TransportClientFactory clientFactory =
> context.createClientFactory();
> List<Thread> ts = new ArrayList<>();
> for (int i = 0; i < 10; i++) {
> ts.add(new Thread(new Runnable() {
> @Override
> public void run() {
> for (int j = 0; j < 1000; j++) {
> try {
> ByteBuf buf = Unpooled.buffer(8);
> buf.writeLong((long) j);
> ByteBuffer byteBuffer =
> clientFactory.createClient("localhost", 8081).
> sendRpcSync(buf.nioBuffer(),
> Long.MAX_VALUE);
> long response = byteBuffer.getLong();
> if (response != j) {
> System.err.println("send:" + j + ",response:"
> + response);
> }
> } catch (IOException e) {
> e.printStackTrace();
> }
> }
> }
> }));
> ts.get(i).start();
> }
> for (Thread t : ts) {
> t.join();
> }
> server.close();
> }
> public class RankHandler extends RpcHandler {
> private final Logger logger = LoggerFactory.getLogger(RankHandler.class);
> private final StreamManager streamManager;
> public RankHandler() {
> this.streamManager = new OneForOneStreamManager();
> }
> @Override
> public void receive(TransportClient client, ByteBuffer msg,
> RpcResponseCallback callback) {
> callback.onSuccess(msg);
> }
> @Override
> public StreamManager getStreamManager() {
> return streamManager;
> }
> }
> {code}
> it will print as below
> send:221,response:222
> send:233,response:234
> send:312,response:313
> send:358,response:359
> ...
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]