This is an automated email from the ASF dual-hosted git repository.

jensg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/thrift.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e6bb84  THRIFT-5288: Move Support for ByteBuffer into TTransport 
Client: Java Patch: David Mollitor
6e6bb84 is described below

commit 6e6bb84be9d8ace4be9744d5637fbb59f58db463
Author: David Mollitor <[email protected]>
AuthorDate: Fri Oct 2 21:10:50 2020 +0200

    THRIFT-5288: Move Support for ByteBuffer into TTransport
    Client: Java
    Patch: David Mollitor
    
    This closes #2254
---
 .../org/apache/thrift/async/TAsyncMethodCall.java  | 16 ++++-----
 .../apache/thrift/protocol/TCompactProtocol.java   | 20 +++++------
 .../thrift/server/AbstractNonblockingServer.java   |  8 ++---
 .../thrift/transport/TNonblockingSocket.java       | 23 +++++++------
 .../thrift/transport/TNonblockingTransport.java    |  4 ---
 .../org/apache/thrift/transport/TTransport.java    | 39 ++++++++++++++++++++++
 .../apache/thrift/transport/sasl/FrameWriter.java  |  5 +--
 .../transport/sasl/NonblockingSaslHandler.java     |  9 +++--
 .../thrift/transport/sasl/TestDataFrameWriter.java |  2 +-
 9 files changed, 81 insertions(+), 45 deletions(-)

diff --git a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java 
b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
index d5c608d..a119f23 100644
--- a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
+++ b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
@@ -217,9 +217,9 @@ public abstract class TAsyncMethodCall<T> {
     state = State.ERROR;
   }
 
-  private void doReadingResponseBody(SelectionKey key) throws IOException {
+  private void doReadingResponseBody(SelectionKey key) throws 
TTransportException {
     if (transport.read(frameBuffer) < 0) {
-      throw new IOException("Read call frame failed");
+      throw new TTransportException(TTransportException.END_OF_FILE, "Read 
call frame failed");
     }
     if (frameBuffer.remaining() == 0) {
       cleanUpAndFireCallback(key);
@@ -241,9 +241,9 @@ public abstract class TAsyncMethodCall<T> {
     }
   }
 
-  private void doReadingResponseSize() throws IOException {
+  private void doReadingResponseSize() throws TTransportException {
     if (transport.read(sizeBuffer) < 0) {
-      throw new IOException("Read call frame size failed");
+      throw new TTransportException(TTransportException.END_OF_FILE, "Read 
call frame size failed");
     }
     if (sizeBuffer.remaining() == 0) {
       state = State.READING_RESPONSE_BODY;
@@ -251,9 +251,9 @@ public abstract class TAsyncMethodCall<T> {
     }
   }
 
-  private void doWritingRequestBody(SelectionKey key) throws IOException {
+  private void doWritingRequestBody(SelectionKey key) throws 
TTransportException {
     if (transport.write(frameBuffer) < 0) {
-      throw new IOException("Write call frame failed");
+      throw new TTransportException(TTransportException.END_OF_FILE, "Write 
call frame failed");
     }
     if (frameBuffer.remaining() == 0) {
       if (isOneway) {
@@ -266,9 +266,9 @@ public abstract class TAsyncMethodCall<T> {
     }
   }
 
-  private void doWritingRequestSize() throws IOException {
+  private void doWritingRequestSize() throws TTransportException {
     if (transport.write(sizeBuffer) < 0) {
-      throw new IOException("Write call frame size failed");
+      throw new TTransportException(TTransportException.END_OF_FILE, "Write 
call frame size failed");
     }
     if (sizeBuffer.remaining() == 0) {
       state = State.WRITING_REQUEST_BODY;
diff --git a/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java 
b/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java
index 0dfcf25..4f4e21f 100644
--- a/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java
+++ b/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java
@@ -363,20 +363,17 @@ public class TCompactProtocol extends TProtocol {
    */
   public void writeString(String str) throws TException {
     byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
-    writeBinary(bytes, 0, bytes.length);
+    writeVarint32(bytes.length);
+    trans_.write(bytes, 0, bytes.length);
   }
 
   /**
    * Write a byte array, using a varint for the size.
    */
   public void writeBinary(ByteBuffer bin) throws TException {
-    int length = bin.limit() - bin.position();
-    writeBinary(bin.array(), bin.position() + bin.arrayOffset(), length);
-  }
-
-  private void writeBinary(byte[] buf, int offset, int length) throws 
TException {
-    writeVarint32(length);
-    trans_.write(buf, offset, length);
+    ByteBuffer bb = bin.asReadOnlyBuffer();
+    writeVarint32(bb.remaining());
+    trans_.write(bb);
   }
 
   //
@@ -694,12 +691,13 @@ public class TCompactProtocol extends TProtocol {
   }
 
   /**
-   * Read a byte[] from the wire.
+   * Read a ByteBuffer from the wire.
    */
   public ByteBuffer readBinary() throws TException {
     int length = readVarint32();
-    
-    if (length == 0) return EMPTY_BUFFER;
+    if (length == 0) {
+      return EMPTY_BUFFER;
+    }
     getTransport().checkReadBytesAvailable(length);
     if (trans_.getBytesRemainingInBuffer() >= length) {
       ByteBuffer bb = ByteBuffer.wrap(trans_.getBuffer(), 
trans_.getBufferPosition(), length);
diff --git 
a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java 
b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
index 4aae803..f91e825 100644
--- a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
@@ -414,8 +414,8 @@ public abstract class AbstractNonblockingServer extends 
TServer {
           if (trans_.write(buffer_) < 0) {
             return false;
           }
-        } catch (IOException e) {
-          LOGGER.warn("Got an IOException during write!", e);
+        } catch (TTransportException e) {
+          LOGGER.warn("Got an Exception during write", e);
           return false;
         }
 
@@ -543,8 +543,8 @@ public abstract class AbstractNonblockingServer extends 
TServer {
     private boolean internalRead() {
       try {
           return trans_.read(buffer_) >= 0;
-      } catch (IOException e) {
-        LOGGER.warn("Got an IOException in internalRead!", e);
+      } catch (TTransportException e) {
+        LOGGER.warn("Got an Exception in internalRead", e);
         return false;
       }
     }
diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java 
b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java
index 76ed02c..13c8586 100644
--- a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java
+++ b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java
@@ -144,11 +144,14 @@ public class TNonblockingSocket extends 
TNonblockingTransport {
   /**
    * Perform a nonblocking read into buffer.
    */
-  public int read(ByteBuffer buffer) throws IOException {
-    return socketChannel_.read(buffer);
+  public int read(ByteBuffer buffer) throws TTransportException {
+    try {
+      return socketChannel_.read(buffer);
+    } catch (IOException iox) {
+      throw new TTransportException(TTransportException.UNKNOWN, iox);
+    }
   }
 
-
   /**
    * Reads from the underlying input stream if not null.
    */
@@ -167,8 +170,12 @@ public class TNonblockingSocket extends 
TNonblockingTransport {
   /**
    * Perform a nonblocking write of the data in buffer;
    */
-  public int write(ByteBuffer buffer) throws IOException {
-    return socketChannel_.write(buffer);
+  public int write(ByteBuffer buffer) throws TTransportException {
+    try {
+      return socketChannel_.write(buffer);
+    } catch (IOException iox) {
+      throw new TTransportException(TTransportException.UNKNOWN, iox);
+    }
   }
 
   /**
@@ -179,11 +186,7 @@ public class TNonblockingSocket extends 
TNonblockingTransport {
       throw new TTransportException(TTransportException.NOT_OPEN,
         "Cannot write to write-only socket channel");
     }
-    try {
-      socketChannel_.write(ByteBuffer.wrap(buf, off, len));
-    } catch (IOException iox) {
-      throw new TTransportException(TTransportException.UNKNOWN, iox);
-    }
+    write(ByteBuffer.wrap(buf, off, len));
   }
 
   /**
diff --git 
a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java 
b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java
index 255595d..30ec9d2 100644
--- a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java
@@ -23,7 +23,6 @@ import org.apache.thrift.TConfiguration;
 
 import java.io.IOException;
 import java.net.SocketAddress;
-import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 
@@ -47,7 +46,4 @@ public abstract class TNonblockingTransport extends 
TEndpointTransport {
 
   public abstract SelectionKey registerSelector(Selector selector, int 
interests) throws IOException;
 
-  public abstract int read(ByteBuffer buffer) throws IOException;
-
-  public abstract int write(ByteBuffer buffer) throws IOException;
 }
diff --git a/lib/java/src/org/apache/thrift/transport/TTransport.java 
b/lib/java/src/org/apache/thrift/transport/TTransport.java
index 5645f7f..ee07024 100644
--- a/lib/java/src/org/apache/thrift/transport/TTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TTransport.java
@@ -22,6 +22,7 @@ package org.apache.thrift.transport;
 import org.apache.thrift.TConfiguration;
 
 import java.io.Closeable;
+import java.nio.ByteBuffer;
 
 /**
  * Generic class that encapsulates the I/O layer. This is basically a thin
@@ -60,6 +61,26 @@ public abstract class TTransport implements Closeable {
   public abstract void close();
 
   /**
+   * Reads a sequence of bytes from this channel into the given buffer. An
+   * attempt is made to read up to the number of bytes remaining in the buffer,
+   * that is, dst.remaining(), at the moment this method is invoked. Upon 
return
+   * the buffer's position will move forward the number of bytes read; its 
limit
+   * will not have changed. Subclasses are encouraged to provide a more
+   * efficient implementation of this method.
+   *
+   * @param dst The buffer into which bytes are to be transferred
+   * @return The number of bytes read, possibly zero, or -1 if the channel has
+   *         reached end-of-stream
+   * @throws TTransportException if there was an error reading data
+   */
+  public int read(ByteBuffer dst) throws TTransportException {
+    byte[] arr = new byte[dst.remaining()];
+    int n = read(arr, 0, arr.length);
+    dst.put(arr, 0, n);
+    return n;
+  }
+
+  /**
    * Reads up to len bytes into buffer buf, starting at offset off.
    *
    * @param buf Array to read into
@@ -121,6 +142,24 @@ public abstract class TTransport implements Closeable {
     throws TTransportException;
 
   /**
+   * Writes a sequence of bytes to the buffer. An attempt is made to write all
+   * remaining bytes in the buffer, that is, src.remaining(), at the moment 
this
+   * method is invoked. Upon return the buffer's position will updated; its 
limit
+   * will not have changed. Subclasses are encouraged to provide a more 
efficient
+   * implementation of this method.
+   *
+   * @param src The buffer from which bytes are to be retrieved
+   * @return The number of bytes written, possibly zero
+   * @throws TTransportException if there was an error writing data
+   */
+  public int write(ByteBuffer src) throws TTransportException {
+    byte[] arr = new byte[src.remaining()];
+    src.get(arr);
+    write(arr, 0, arr.length);
+    return arr.length;
+  }
+
+  /**
    * Flush any pending data out of a transport buffer.
    *
    * @throws TTransportException if there was an error writing out data.
diff --git a/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java 
b/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java
index e5feba0..4357f13 100644
--- a/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java
+++ b/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransportException;
 
 /**
  * Write frame (header and payload) to transport in a nonblocking way.
@@ -99,9 +100,9 @@ public abstract class FrameWriter {
   /**
    * Nonblocking write to the underlying transport.
    *
-   * @throws IOException
+   * @throws TTransportException
    */
-  public void write(TNonblockingTransport transport) throws IOException {
+  public void write(TNonblockingTransport transport) throws 
TTransportException {
     transport.write(frameBytes);
   }
 
diff --git 
a/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java 
b/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java
index 4557146..d73c3ec 100644
--- a/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java
+++ b/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java
@@ -19,7 +19,6 @@
 
 package org.apache.thrift.transport.sasl;
 
-import java.io.IOException;
 import java.nio.channels.SelectionKey;
 import java.nio.charset.StandardCharsets;
 
@@ -364,7 +363,7 @@ public class NonblockingSaslHandler {
         saslChallenge.clear();
         nextPhase = Phase.READING_SASL_RESPONSE;
       }
-    } catch (IOException e) {
+    } catch (TTransportException e) {
       fail(e);
     }
   }
@@ -378,7 +377,7 @@ public class NonblockingSaslHandler {
         saslResponse = null;
         nextPhase = Phase.READING_REQUEST;
       }
-    } catch (IOException e) {
+    } catch (TTransportException e) {
       fail(e);
     }
   }
@@ -389,7 +388,7 @@ public class NonblockingSaslHandler {
       if (saslChallenge.isComplete()) {
         nextPhase = Phase.CLOSING;
       }
-    } catch (IOException e) {
+    } catch (TTransportException e) {
       fail(e);
     }
   }
@@ -401,7 +400,7 @@ public class NonblockingSaslHandler {
         responseWriter.clear();
         nextPhase = Phase.READING_REQUEST;
       }
-    } catch (IOException e) {
+    } catch (TTransportException e) {
       fail(e);
     }
   }
diff --git 
a/lib/java/test/org/apache/thrift/transport/sasl/TestDataFrameWriter.java 
b/lib/java/test/org/apache/thrift/transport/sasl/TestDataFrameWriter.java
index d242593..60fe5c9 100644
--- a/lib/java/test/org/apache/thrift/transport/sasl/TestDataFrameWriter.java
+++ b/lib/java/test/org/apache/thrift/transport/sasl/TestDataFrameWriter.java
@@ -72,7 +72,7 @@ public class TestDataFrameWriter {
   }
 
   @Test
-  public void testWrite() throws IOException {
+  public void testWrite() throws Exception {
     DataFrameWriter frameWriter = new DataFrameWriter();
     frameWriter.withOnlyPayload(BYTES);
     // Slow socket which writes one byte per call.

Reply via email to