waitinfuture commented on code in PR #2133:
URL:
https://github.com/apache/incubator-celeborn/pull/2133#discussion_r1421683539
##########
common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java:
##########
@@ -278,6 +280,59 @@ public void onFailure(Throwable e) {
}
}
+ public ByteBuffer openStream(
+ TransportMessage message, long timeoutMs, long openStreamRetryInterval)
throws IOException {
+ BlockingQueue<ByteBuffer> result = new LinkedBlockingQueue(1);
+ BlockingQueue<Throwable> exception = new LinkedBlockingQueue(1);
+ ByteBuffer response = null;
+
+ RpcResponseCallback openStreamCallback =
+ new RpcResponseCallback() {
+ @Override
+ public void onSuccess(ByteBuffer response) {
+ ByteBuffer copy = ByteBuffer.allocate(response.remaining());
+ copy.put(response);
+ // flip "copy" to make it readable
+ copy.flip();
+ result.add(copy);
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ exception.add(e);
+ }
+ };
+ long startOpenTime = System.currentTimeMillis();
+ sendRpc(message.toByteBuffer(), openStreamCallback);
+ while (response == null) {
+ try {
+ response = result.poll(100, TimeUnit.MILLISECONDS);
+ Throwable e = exception.poll();
+ if (response == null) {
+ if (e != null) {
+ if (e.getMessage().contains("FileUnderSortingException")) {
+ logger.info(
+ "open stream fail cause by "
+ + e.getMessage()
+ + " and will reopen it after "
+ + openStreamRetryInterval
+ + "ms");
+ Thread.sleep(openStreamRetryInterval);
+ } else throw (IOException) e;
+ if (System.currentTimeMillis() - startOpenTime > timeoutMs) {
Review Comment:
Seems L322-325 should be out of `if (e != null) {` block
##########
common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java:
##########
@@ -278,6 +280,59 @@ public void onFailure(Throwable e) {
}
}
+ public ByteBuffer openStream(
+ TransportMessage message, long timeoutMs, long openStreamRetryInterval)
throws IOException {
+ BlockingQueue<ByteBuffer> result = new LinkedBlockingQueue(1);
+ BlockingQueue<Throwable> exception = new LinkedBlockingQueue(1);
+ ByteBuffer response = null;
+
+ RpcResponseCallback openStreamCallback =
+ new RpcResponseCallback() {
+ @Override
+ public void onSuccess(ByteBuffer response) {
+ ByteBuffer copy = ByteBuffer.allocate(response.remaining());
+ copy.put(response);
+ // flip "copy" to make it readable
+ copy.flip();
+ result.add(copy);
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ exception.add(e);
+ }
+ };
+ long startOpenTime = System.currentTimeMillis();
+ sendRpc(message.toByteBuffer(), openStreamCallback);
+ while (response == null) {
+ try {
+ response = result.poll(100, TimeUnit.MILLISECONDS);
+ Throwable e = exception.poll();
+ if (response == null) {
+ if (e != null) {
+ if (e.getMessage().contains("FileUnderSortingException")) {
+ logger.info(
+ "open stream fail cause by "
+ + e.getMessage()
+ + " and will reopen it after "
+ + openStreamRetryInterval
+ + "ms");
+ Thread.sleep(openStreamRetryInterval);
+ } else throw (IOException) e;
+ if (System.currentTimeMillis() - startOpenTime > timeoutMs) {
+ logger.info("open stream timeout");
+ throw new IOException("open stream timeout!");
+ }
+ sendRpc(message.toByteBuffer(), openStreamCallback);
Review Comment:
Need to clear `exception` before sending rpc again
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]