This is an automated email from the ASF dual-hosted git repository.
jensg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/thrift.git
The following commit(s) were added to refs/heads/master by this push:
new 6e6bb84 THRIFT-5288: Move Support for ByteBuffer into TTransport
Client: Java Patch: David Mollitor
6e6bb84 is described below
commit 6e6bb84be9d8ace4be9744d5637fbb59f58db463
Author: David Mollitor <[email protected]>
AuthorDate: Fri Oct 2 21:10:50 2020 +0200
THRIFT-5288: Move Support for ByteBuffer into TTransport
Client: Java
Patch: David Mollitor
This closes #2254
---
.../org/apache/thrift/async/TAsyncMethodCall.java | 16 ++++-----
.../apache/thrift/protocol/TCompactProtocol.java | 20 +++++------
.../thrift/server/AbstractNonblockingServer.java | 8 ++---
.../thrift/transport/TNonblockingSocket.java | 23 +++++++------
.../thrift/transport/TNonblockingTransport.java | 4 ---
.../org/apache/thrift/transport/TTransport.java | 39 ++++++++++++++++++++++
.../apache/thrift/transport/sasl/FrameWriter.java | 5 +--
.../transport/sasl/NonblockingSaslHandler.java | 9 +++--
.../thrift/transport/sasl/TestDataFrameWriter.java | 2 +-
9 files changed, 81 insertions(+), 45 deletions(-)
diff --git a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
index d5c608d..a119f23 100644
--- a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
+++ b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
@@ -217,9 +217,9 @@ public abstract class TAsyncMethodCall<T> {
state = State.ERROR;
}
- private void doReadingResponseBody(SelectionKey key) throws IOException {
+ private void doReadingResponseBody(SelectionKey key) throws
TTransportException {
if (transport.read(frameBuffer) < 0) {
- throw new IOException("Read call frame failed");
+ throw new TTransportException(TTransportException.END_OF_FILE, "Read
call frame failed");
}
if (frameBuffer.remaining() == 0) {
cleanUpAndFireCallback(key);
@@ -241,9 +241,9 @@ public abstract class TAsyncMethodCall<T> {
}
}
- private void doReadingResponseSize() throws IOException {
+ private void doReadingResponseSize() throws TTransportException {
if (transport.read(sizeBuffer) < 0) {
- throw new IOException("Read call frame size failed");
+ throw new TTransportException(TTransportException.END_OF_FILE, "Read
call frame size failed");
}
if (sizeBuffer.remaining() == 0) {
state = State.READING_RESPONSE_BODY;
@@ -251,9 +251,9 @@ public abstract class TAsyncMethodCall<T> {
}
}
- private void doWritingRequestBody(SelectionKey key) throws IOException {
+ private void doWritingRequestBody(SelectionKey key) throws
TTransportException {
if (transport.write(frameBuffer) < 0) {
- throw new IOException("Write call frame failed");
+ throw new TTransportException(TTransportException.END_OF_FILE, "Write
call frame failed");
}
if (frameBuffer.remaining() == 0) {
if (isOneway) {
@@ -266,9 +266,9 @@ public abstract class TAsyncMethodCall<T> {
}
}
- private void doWritingRequestSize() throws IOException {
+ private void doWritingRequestSize() throws TTransportException {
if (transport.write(sizeBuffer) < 0) {
- throw new IOException("Write call frame size failed");
+ throw new TTransportException(TTransportException.END_OF_FILE, "Write
call frame size failed");
}
if (sizeBuffer.remaining() == 0) {
state = State.WRITING_REQUEST_BODY;
diff --git a/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java
b/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java
index 0dfcf25..4f4e21f 100644
--- a/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java
+++ b/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java
@@ -363,20 +363,17 @@ public class TCompactProtocol extends TProtocol {
*/
public void writeString(String str) throws TException {
byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
- writeBinary(bytes, 0, bytes.length);
+ writeVarint32(bytes.length);
+ trans_.write(bytes, 0, bytes.length);
}
/**
* Write a byte array, using a varint for the size.
*/
public void writeBinary(ByteBuffer bin) throws TException {
- int length = bin.limit() - bin.position();
- writeBinary(bin.array(), bin.position() + bin.arrayOffset(), length);
- }
-
- private void writeBinary(byte[] buf, int offset, int length) throws
TException {
- writeVarint32(length);
- trans_.write(buf, offset, length);
+ ByteBuffer bb = bin.asReadOnlyBuffer();
+ writeVarint32(bb.remaining());
+ trans_.write(bb);
}
//
@@ -694,12 +691,13 @@ public class TCompactProtocol extends TProtocol {
}
/**
- * Read a byte[] from the wire.
+ * Read a ByteBuffer from the wire.
*/
public ByteBuffer readBinary() throws TException {
int length = readVarint32();
-
- if (length == 0) return EMPTY_BUFFER;
+ if (length == 0) {
+ return EMPTY_BUFFER;
+ }
getTransport().checkReadBytesAvailable(length);
if (trans_.getBytesRemainingInBuffer() >= length) {
ByteBuffer bb = ByteBuffer.wrap(trans_.getBuffer(),
trans_.getBufferPosition(), length);
diff --git
a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
index 4aae803..f91e825 100644
--- a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
@@ -414,8 +414,8 @@ public abstract class AbstractNonblockingServer extends
TServer {
if (trans_.write(buffer_) < 0) {
return false;
}
- } catch (IOException e) {
- LOGGER.warn("Got an IOException during write!", e);
+ } catch (TTransportException e) {
+ LOGGER.warn("Got an Exception during write", e);
return false;
}
@@ -543,8 +543,8 @@ public abstract class AbstractNonblockingServer extends
TServer {
private boolean internalRead() {
try {
return trans_.read(buffer_) >= 0;
- } catch (IOException e) {
- LOGGER.warn("Got an IOException in internalRead!", e);
+ } catch (TTransportException e) {
+ LOGGER.warn("Got an Exception in internalRead", e);
return false;
}
}
diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java
b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java
index 76ed02c..13c8586 100644
--- a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java
+++ b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java
@@ -144,11 +144,14 @@ public class TNonblockingSocket extends
TNonblockingTransport {
/**
* Perform a nonblocking read into buffer.
*/
- public int read(ByteBuffer buffer) throws IOException {
- return socketChannel_.read(buffer);
+ public int read(ByteBuffer buffer) throws TTransportException {
+ try {
+ return socketChannel_.read(buffer);
+ } catch (IOException iox) {
+ throw new TTransportException(TTransportException.UNKNOWN, iox);
+ }
}
-
/**
* Reads from the underlying input stream if not null.
*/
@@ -167,8 +170,12 @@ public class TNonblockingSocket extends
TNonblockingTransport {
/**
* Perform a nonblocking write of the data in buffer;
*/
- public int write(ByteBuffer buffer) throws IOException {
- return socketChannel_.write(buffer);
+ public int write(ByteBuffer buffer) throws TTransportException {
+ try {
+ return socketChannel_.write(buffer);
+ } catch (IOException iox) {
+ throw new TTransportException(TTransportException.UNKNOWN, iox);
+ }
}
/**
@@ -179,11 +186,7 @@ public class TNonblockingSocket extends
TNonblockingTransport {
throw new TTransportException(TTransportException.NOT_OPEN,
"Cannot write to write-only socket channel");
}
- try {
- socketChannel_.write(ByteBuffer.wrap(buf, off, len));
- } catch (IOException iox) {
- throw new TTransportException(TTransportException.UNKNOWN, iox);
- }
+ write(ByteBuffer.wrap(buf, off, len));
}
/**
diff --git
a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java
b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java
index 255595d..30ec9d2 100644
--- a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java
@@ -23,7 +23,6 @@ import org.apache.thrift.TConfiguration;
import java.io.IOException;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
@@ -47,7 +46,4 @@ public abstract class TNonblockingTransport extends
TEndpointTransport {
public abstract SelectionKey registerSelector(Selector selector, int
interests) throws IOException;
- public abstract int read(ByteBuffer buffer) throws IOException;
-
- public abstract int write(ByteBuffer buffer) throws IOException;
}
diff --git a/lib/java/src/org/apache/thrift/transport/TTransport.java
b/lib/java/src/org/apache/thrift/transport/TTransport.java
index 5645f7f..ee07024 100644
--- a/lib/java/src/org/apache/thrift/transport/TTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TTransport.java
@@ -22,6 +22,7 @@ package org.apache.thrift.transport;
import org.apache.thrift.TConfiguration;
import java.io.Closeable;
+import java.nio.ByteBuffer;
/**
* Generic class that encapsulates the I/O layer. This is basically a thin
@@ -60,6 +61,26 @@ public abstract class TTransport implements Closeable {
public abstract void close();
/**
+ * Reads a sequence of bytes from this channel into the given buffer. An
+ * attempt is made to read up to the number of bytes remaining in the buffer,
+ * that is, dst.remaining(), at the moment this method is invoked. Upon
return
+ * the buffer's position will move forward the number of bytes read; its
limit
+ * will not have changed. Subclasses are encouraged to provide a more
+ * efficient implementation of this method.
+ *
+ * @param dst The buffer into which bytes are to be transferred
+ * @return The number of bytes read, possibly zero, or -1 if the channel has
+ * reached end-of-stream
+ * @throws TTransportException if there was an error reading data
+ */
+ public int read(ByteBuffer dst) throws TTransportException {
+ byte[] arr = new byte[dst.remaining()];
+ int n = read(arr, 0, arr.length);
+ dst.put(arr, 0, n);
+ return n;
+ }
+
+ /**
* Reads up to len bytes into buffer buf, starting at offset off.
*
* @param buf Array to read into
@@ -121,6 +142,24 @@ public abstract class TTransport implements Closeable {
throws TTransportException;
/**
+ * Writes a sequence of bytes to the buffer. An attempt is made to write all
+ * remaining bytes in the buffer, that is, src.remaining(), at the moment
this
+ * method is invoked. Upon return the buffer's position will updated; its
limit
+ * will not have changed. Subclasses are encouraged to provide a more
efficient
+ * implementation of this method.
+ *
+ * @param src The buffer from which bytes are to be retrieved
+ * @return The number of bytes written, possibly zero
+ * @throws TTransportException if there was an error writing data
+ */
+ public int write(ByteBuffer src) throws TTransportException {
+ byte[] arr = new byte[src.remaining()];
+ src.get(arr);
+ write(arr, 0, arr.length);
+ return arr.length;
+ }
+
+ /**
* Flush any pending data out of a transport buffer.
*
* @throws TTransportException if there was an error writing out data.
diff --git a/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java
b/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java
index e5feba0..4357f13 100644
--- a/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java
+++ b/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransportException;
/**
* Write frame (header and payload) to transport in a nonblocking way.
@@ -99,9 +100,9 @@ public abstract class FrameWriter {
/**
* Nonblocking write to the underlying transport.
*
- * @throws IOException
+ * @throws TTransportException
*/
- public void write(TNonblockingTransport transport) throws IOException {
+ public void write(TNonblockingTransport transport) throws
TTransportException {
transport.write(frameBytes);
}
diff --git
a/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java
b/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java
index 4557146..d73c3ec 100644
--- a/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java
+++ b/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java
@@ -19,7 +19,6 @@
package org.apache.thrift.transport.sasl;
-import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.charset.StandardCharsets;
@@ -364,7 +363,7 @@ public class NonblockingSaslHandler {
saslChallenge.clear();
nextPhase = Phase.READING_SASL_RESPONSE;
}
- } catch (IOException e) {
+ } catch (TTransportException e) {
fail(e);
}
}
@@ -378,7 +377,7 @@ public class NonblockingSaslHandler {
saslResponse = null;
nextPhase = Phase.READING_REQUEST;
}
- } catch (IOException e) {
+ } catch (TTransportException e) {
fail(e);
}
}
@@ -389,7 +388,7 @@ public class NonblockingSaslHandler {
if (saslChallenge.isComplete()) {
nextPhase = Phase.CLOSING;
}
- } catch (IOException e) {
+ } catch (TTransportException e) {
fail(e);
}
}
@@ -401,7 +400,7 @@ public class NonblockingSaslHandler {
responseWriter.clear();
nextPhase = Phase.READING_REQUEST;
}
- } catch (IOException e) {
+ } catch (TTransportException e) {
fail(e);
}
}
diff --git
a/lib/java/test/org/apache/thrift/transport/sasl/TestDataFrameWriter.java
b/lib/java/test/org/apache/thrift/transport/sasl/TestDataFrameWriter.java
index d242593..60fe5c9 100644
--- a/lib/java/test/org/apache/thrift/transport/sasl/TestDataFrameWriter.java
+++ b/lib/java/test/org/apache/thrift/transport/sasl/TestDataFrameWriter.java
@@ -72,7 +72,7 @@ public class TestDataFrameWriter {
}
@Test
- public void testWrite() throws IOException {
+ public void testWrite() throws Exception {
DataFrameWriter frameWriter = new DataFrameWriter();
frameWriter.withOnlyPayload(BYTES);
// Slow socket which writes one byte per call.