chaokunyang commented on code in PR #1483:
URL: https://github.com/apache/incubator-fury/pull/1483#discussion_r1557302317
##########
java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java:
##########
@@ -128,21 +128,21 @@ public void readToUnsafe(Object target, long
targetPointer, int numBytes) {
@Override
public void readToByteBuffer(ByteBuffer dst, int length) {
- readToByteBuffer0(dst, length);
+ MemoryBuffer buf = memoryBuffer;
+ int remaining = buf.remaining();
+ if (remaining < length) {
+ remaining += fillBuffer(length - remaining);
+ }
+ buf.read(dst, remaining);
}
@Override
public int readToByteBuffer(ByteBuffer dst) {
- return readToByteBuffer0(dst, dst.remaining());
- }
-
- private int readToByteBuffer0(ByteBuffer dst, int length) {
MemoryBuffer buf = memoryBuffer;
int remaining = buf.remaining();
- if (remaining < length) {
- remaining += fillBuffer(length - remaining);
+ if (remaining > 0) {
+ buf.read(dst, remaining);
Review Comment:
We may need to check stream.available here, otherwise continuous invoking
`readToByteBuffer` this method will always return 0
##########
java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java:
##########
@@ -20,30 +20,130 @@
package org.apache.fury.io;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.fury.exception.DeserializationException;
import org.apache.fury.memory.MemoryBuffer;
+import org.apache.fury.util.Platform;
+import org.apache.fury.util.Preconditions;
-// TODO support zero-copy channel reading.
-public class FuryReadableChannel extends AbstractStreamReader implements
ReadableByteChannel {
+@NotThreadSafe
+public class FuryReadableChannel implements FuryStreamReader,
ReadableByteChannel {
private final ReadableByteChannel channel;
- private final ByteBuffer byteBuffer;
- private final MemoryBuffer buffer;
+ private final MemoryBuffer memoryBuffer;
+ private ByteBuffer byteBuffer;
public FuryReadableChannel(ReadableByteChannel channel) {
- this(channel, ByteBuffer.allocate(4096));
+ this(channel, ByteBuffer.allocateDirect(4096));
}
- private FuryReadableChannel(ReadableByteChannel channel, ByteBuffer
directBuffer) {
+ public FuryReadableChannel(ReadableByteChannel channel, ByteBuffer
directBuffer) {
+ Preconditions.checkArgument(
+ directBuffer.isDirect(), "FuryReadableChannel support only direct
ByteBuffer.");
this.channel = channel;
this.byteBuffer = directBuffer;
- this.buffer = MemoryBuffer.fromByteBuffer(directBuffer);
+
+ long offHeapAddress = Platform.getAddress(directBuffer) +
directBuffer.position();
+ this.memoryBuffer = new MemoryBuffer(offHeapAddress, 0, directBuffer,
this);
+ }
+
+ @Override
+ public int fillBuffer(int minFillSize) {
+ try {
+ ByteBuffer byteBuf = byteBuffer;
+ MemoryBuffer memoryBuf = memoryBuffer;
+ int position = byteBuf.position();
+ int newLimit = position + minFillSize;
+ if (newLimit > byteBuf.capacity()) {
+ int newSize =
+ newLimit < BUFFER_GROW_STEP_THRESHOLD ? newLimit << 2 : (int)
(newLimit * 1.5);
+ ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newSize);
+ byteBuf.position(0);
+ newByteBuf.put(byteBuf);
+ byteBuf = byteBuffer = newByteBuf;
+ memoryBuf.initDirectBuffer(Platform.getAddress(byteBuf), position,
byteBuf);
+ }
+ byteBuf.limit(newLimit);
+ int readCount = channel.read(byteBuf);
+ memoryBuf.increaseSize(readCount);
+ return readCount;
+ } catch (IOException e) {
+ throw new DeserializationException("Failed to read the provided byte
channel", e);
+ }
}
@Override
public int read(ByteBuffer dst) throws IOException {
- throw new UnsupportedEncodingException();
+ int dstRemaining = dst.remaining();
+ if (dstRemaining <= 0) {
+ return 0;
+ }
+ MemoryBuffer buf = memoryBuffer;
+ int remaining = buf.remaining();
+ if (remaining <= 0) {
+ return -1;
+ }
+ if (remaining >= dstRemaining) {
+ byte[] bytes = buf.readBytes(dstRemaining);
Review Comment:
This introduce an extra copy, we can use `buf.read(dst, dstRemaining)`
##########
java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java:
##########
@@ -20,30 +20,130 @@
package org.apache.fury.io;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.fury.exception.DeserializationException;
import org.apache.fury.memory.MemoryBuffer;
+import org.apache.fury.util.Platform;
+import org.apache.fury.util.Preconditions;
-// TODO support zero-copy channel reading.
-public class FuryReadableChannel extends AbstractStreamReader implements
ReadableByteChannel {
+@NotThreadSafe
+public class FuryReadableChannel implements FuryStreamReader,
ReadableByteChannel {
private final ReadableByteChannel channel;
- private final ByteBuffer byteBuffer;
- private final MemoryBuffer buffer;
+ private final MemoryBuffer memoryBuffer;
+ private ByteBuffer byteBuffer;
public FuryReadableChannel(ReadableByteChannel channel) {
- this(channel, ByteBuffer.allocate(4096));
+ this(channel, ByteBuffer.allocateDirect(4096));
}
- private FuryReadableChannel(ReadableByteChannel channel, ByteBuffer
directBuffer) {
+ public FuryReadableChannel(ReadableByteChannel channel, ByteBuffer
directBuffer) {
+ Preconditions.checkArgument(
+ directBuffer.isDirect(), "FuryReadableChannel support only direct
ByteBuffer.");
this.channel = channel;
this.byteBuffer = directBuffer;
- this.buffer = MemoryBuffer.fromByteBuffer(directBuffer);
+
+ long offHeapAddress = Platform.getAddress(directBuffer) +
directBuffer.position();
+ this.memoryBuffer = new MemoryBuffer(offHeapAddress, 0, directBuffer,
this);
+ }
+
+ @Override
+ public int fillBuffer(int minFillSize) {
+ try {
+ ByteBuffer byteBuf = byteBuffer;
+ MemoryBuffer memoryBuf = memoryBuffer;
+ int position = byteBuf.position();
+ int newLimit = position + minFillSize;
+ if (newLimit > byteBuf.capacity()) {
+ int newSize =
+ newLimit < BUFFER_GROW_STEP_THRESHOLD ? newLimit << 2 : (int)
(newLimit * 1.5);
+ ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newSize);
+ byteBuf.position(0);
+ newByteBuf.put(byteBuf);
+ byteBuf = byteBuffer = newByteBuf;
+ memoryBuf.initDirectBuffer(Platform.getAddress(byteBuf), position,
byteBuf);
+ }
+ byteBuf.limit(newLimit);
+ int readCount = channel.read(byteBuf);
+ memoryBuf.increaseSize(readCount);
+ return readCount;
+ } catch (IOException e) {
+ throw new DeserializationException("Failed to read the provided byte
channel", e);
+ }
}
@Override
public int read(ByteBuffer dst) throws IOException {
- throw new UnsupportedEncodingException();
+ int dstRemaining = dst.remaining();
+ if (dstRemaining <= 0) {
+ return 0;
+ }
+ MemoryBuffer buf = memoryBuffer;
+ int remaining = buf.remaining();
+ if (remaining <= 0) {
Review Comment:
Seems this can be removed, we do need to check `channel` even buffer
remaining size is 0
##########
java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java:
##########
@@ -20,30 +20,130 @@
package org.apache.fury.io;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.fury.exception.DeserializationException;
import org.apache.fury.memory.MemoryBuffer;
+import org.apache.fury.util.Platform;
+import org.apache.fury.util.Preconditions;
-// TODO support zero-copy channel reading.
-public class FuryReadableChannel extends AbstractStreamReader implements
ReadableByteChannel {
+@NotThreadSafe
+public class FuryReadableChannel implements FuryStreamReader,
ReadableByteChannel {
private final ReadableByteChannel channel;
- private final ByteBuffer byteBuffer;
- private final MemoryBuffer buffer;
+ private final MemoryBuffer memoryBuffer;
+ private ByteBuffer byteBuffer;
public FuryReadableChannel(ReadableByteChannel channel) {
- this(channel, ByteBuffer.allocate(4096));
+ this(channel, ByteBuffer.allocateDirect(4096));
}
- private FuryReadableChannel(ReadableByteChannel channel, ByteBuffer
directBuffer) {
+ public FuryReadableChannel(ReadableByteChannel channel, ByteBuffer
directBuffer) {
+ Preconditions.checkArgument(
+ directBuffer.isDirect(), "FuryReadableChannel support only direct
ByteBuffer.");
this.channel = channel;
this.byteBuffer = directBuffer;
- this.buffer = MemoryBuffer.fromByteBuffer(directBuffer);
+
+ long offHeapAddress = Platform.getAddress(directBuffer) +
directBuffer.position();
+ this.memoryBuffer = new MemoryBuffer(offHeapAddress, 0, directBuffer,
this);
+ }
+
+ @Override
+ public int fillBuffer(int minFillSize) {
+ try {
+ ByteBuffer byteBuf = byteBuffer;
+ MemoryBuffer memoryBuf = memoryBuffer;
+ int position = byteBuf.position();
+ int newLimit = position + minFillSize;
+ if (newLimit > byteBuf.capacity()) {
+ int newSize =
+ newLimit < BUFFER_GROW_STEP_THRESHOLD ? newLimit << 2 : (int)
(newLimit * 1.5);
+ ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newSize);
+ byteBuf.position(0);
+ newByteBuf.put(byteBuf);
+ byteBuf = byteBuffer = newByteBuf;
+ memoryBuf.initDirectBuffer(Platform.getAddress(byteBuf), position,
byteBuf);
+ }
+ byteBuf.limit(newLimit);
+ int readCount = channel.read(byteBuf);
+ memoryBuf.increaseSize(readCount);
+ return readCount;
+ } catch (IOException e) {
+ throw new DeserializationException("Failed to read the provided byte
channel", e);
+ }
}
@Override
public int read(ByteBuffer dst) throws IOException {
- throw new UnsupportedEncodingException();
+ int dstRemaining = dst.remaining();
+ if (dstRemaining <= 0) {
+ return 0;
+ }
+ MemoryBuffer buf = memoryBuffer;
+ int remaining = buf.remaining();
+ if (remaining <= 0) {
+ return -1;
+ }
+ if (remaining >= dstRemaining) {
+ byte[] bytes = buf.readBytes(dstRemaining);
+ dst.put(bytes);
+ return dstRemaining;
+ } else {
+ int filledSize = fillBuffer(dstRemaining - remaining);
+ int length = remaining + filledSize;
+ byte[] bytes = buf.readBytes(length);
+ dst.put(bytes);
+ return length;
Review Comment:
```suggestion
MemoryBuffer buf = buffer;
int remaining = buf.remaining();
int len = dst.remaining();
if (remaining >= len) {
buf.read(dst, len);
return len;
} else {
try {
buf.read(dst, remaining);
return channel.read(dst) + remaining;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]