chaokunyang commented on code in PR #1483:
URL: https://github.com/apache/incubator-fury/pull/1483#discussion_r1557302317


##########
java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java:
##########
@@ -128,21 +128,21 @@ public void readToUnsafe(Object target, long 
targetPointer, int numBytes) {
 
   @Override
   public void readToByteBuffer(ByteBuffer dst, int length) {
-    readToByteBuffer0(dst, length);
+    MemoryBuffer buf = memoryBuffer;
+    int remaining = buf.remaining();
+    if (remaining < length) {
+      remaining += fillBuffer(length - remaining);
+    }
+    buf.read(dst, remaining);
   }
 
   @Override
   public int readToByteBuffer(ByteBuffer dst) {
-    return readToByteBuffer0(dst, dst.remaining());
-  }
-
-  private int readToByteBuffer0(ByteBuffer dst, int length) {
     MemoryBuffer buf = memoryBuffer;
     int remaining = buf.remaining();
-    if (remaining < length) {
-      remaining += fillBuffer(length - remaining);
+    if (remaining > 0) {
+      buf.read(dst, remaining);

Review Comment:
   We may need to check stream.available here, otherwise continuous invoking 
`readToByteBuffer` this method will always return 0



##########
java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java:
##########
@@ -20,30 +20,130 @@
 package org.apache.fury.io;
 
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.fury.exception.DeserializationException;
 import org.apache.fury.memory.MemoryBuffer;
+import org.apache.fury.util.Platform;
+import org.apache.fury.util.Preconditions;
 
-// TODO support zero-copy channel reading.
-public class FuryReadableChannel extends AbstractStreamReader implements 
ReadableByteChannel {
+@NotThreadSafe
+public class FuryReadableChannel implements FuryStreamReader, 
ReadableByteChannel {
   private final ReadableByteChannel channel;
-  private final ByteBuffer byteBuffer;
-  private final MemoryBuffer buffer;
+  private final MemoryBuffer memoryBuffer;
+  private ByteBuffer byteBuffer;
 
   public FuryReadableChannel(ReadableByteChannel channel) {
-    this(channel, ByteBuffer.allocate(4096));
+    this(channel, ByteBuffer.allocateDirect(4096));
   }
 
-  private FuryReadableChannel(ReadableByteChannel channel, ByteBuffer 
directBuffer) {
+  public FuryReadableChannel(ReadableByteChannel channel, ByteBuffer 
directBuffer) {
+    Preconditions.checkArgument(
+        directBuffer.isDirect(), "FuryReadableChannel support only direct 
ByteBuffer.");
     this.channel = channel;
     this.byteBuffer = directBuffer;
-    this.buffer = MemoryBuffer.fromByteBuffer(directBuffer);
+
+    long offHeapAddress = Platform.getAddress(directBuffer) + 
directBuffer.position();
+    this.memoryBuffer = new MemoryBuffer(offHeapAddress, 0, directBuffer, 
this);
+  }
+
+  @Override
+  public int fillBuffer(int minFillSize) {
+    try {
+      ByteBuffer byteBuf = byteBuffer;
+      MemoryBuffer memoryBuf = memoryBuffer;
+      int position = byteBuf.position();
+      int newLimit = position + minFillSize;
+      if (newLimit > byteBuf.capacity()) {
+        int newSize =
+            newLimit < BUFFER_GROW_STEP_THRESHOLD ? newLimit << 2 : (int) 
(newLimit * 1.5);
+        ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newSize);
+        byteBuf.position(0);
+        newByteBuf.put(byteBuf);
+        byteBuf = byteBuffer = newByteBuf;
+        memoryBuf.initDirectBuffer(Platform.getAddress(byteBuf), position, 
byteBuf);
+      }
+      byteBuf.limit(newLimit);
+      int readCount = channel.read(byteBuf);
+      memoryBuf.increaseSize(readCount);
+      return readCount;
+    } catch (IOException e) {
+      throw new DeserializationException("Failed to read the provided byte 
channel", e);
+    }
   }
 
   @Override
   public int read(ByteBuffer dst) throws IOException {
-    throw new UnsupportedEncodingException();
+    int dstRemaining = dst.remaining();
+    if (dstRemaining <= 0) {
+      return 0;
+    }
+    MemoryBuffer buf = memoryBuffer;
+    int remaining = buf.remaining();
+    if (remaining <= 0) {
+      return -1;
+    }
+    if (remaining >= dstRemaining) {
+      byte[] bytes = buf.readBytes(dstRemaining);

Review Comment:
   This introduce an extra copy, we can use `buf.read(dst, dstRemaining)`



##########
java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java:
##########
@@ -20,30 +20,130 @@
 package org.apache.fury.io;
 
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.fury.exception.DeserializationException;
 import org.apache.fury.memory.MemoryBuffer;
+import org.apache.fury.util.Platform;
+import org.apache.fury.util.Preconditions;
 
-// TODO support zero-copy channel reading.
-public class FuryReadableChannel extends AbstractStreamReader implements 
ReadableByteChannel {
+@NotThreadSafe
+public class FuryReadableChannel implements FuryStreamReader, 
ReadableByteChannel {
   private final ReadableByteChannel channel;
-  private final ByteBuffer byteBuffer;
-  private final MemoryBuffer buffer;
+  private final MemoryBuffer memoryBuffer;
+  private ByteBuffer byteBuffer;
 
   public FuryReadableChannel(ReadableByteChannel channel) {
-    this(channel, ByteBuffer.allocate(4096));
+    this(channel, ByteBuffer.allocateDirect(4096));
   }
 
-  private FuryReadableChannel(ReadableByteChannel channel, ByteBuffer 
directBuffer) {
+  public FuryReadableChannel(ReadableByteChannel channel, ByteBuffer 
directBuffer) {
+    Preconditions.checkArgument(
+        directBuffer.isDirect(), "FuryReadableChannel support only direct 
ByteBuffer.");
     this.channel = channel;
     this.byteBuffer = directBuffer;
-    this.buffer = MemoryBuffer.fromByteBuffer(directBuffer);
+
+    long offHeapAddress = Platform.getAddress(directBuffer) + 
directBuffer.position();
+    this.memoryBuffer = new MemoryBuffer(offHeapAddress, 0, directBuffer, 
this);
+  }
+
+  @Override
+  public int fillBuffer(int minFillSize) {
+    try {
+      ByteBuffer byteBuf = byteBuffer;
+      MemoryBuffer memoryBuf = memoryBuffer;
+      int position = byteBuf.position();
+      int newLimit = position + minFillSize;
+      if (newLimit > byteBuf.capacity()) {
+        int newSize =
+            newLimit < BUFFER_GROW_STEP_THRESHOLD ? newLimit << 2 : (int) 
(newLimit * 1.5);
+        ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newSize);
+        byteBuf.position(0);
+        newByteBuf.put(byteBuf);
+        byteBuf = byteBuffer = newByteBuf;
+        memoryBuf.initDirectBuffer(Platform.getAddress(byteBuf), position, 
byteBuf);
+      }
+      byteBuf.limit(newLimit);
+      int readCount = channel.read(byteBuf);
+      memoryBuf.increaseSize(readCount);
+      return readCount;
+    } catch (IOException e) {
+      throw new DeserializationException("Failed to read the provided byte 
channel", e);
+    }
   }
 
   @Override
   public int read(ByteBuffer dst) throws IOException {
-    throw new UnsupportedEncodingException();
+    int dstRemaining = dst.remaining();
+    if (dstRemaining <= 0) {
+      return 0;
+    }
+    MemoryBuffer buf = memoryBuffer;
+    int remaining = buf.remaining();
+    if (remaining <= 0) {

Review Comment:
   Seems this can be removed, we do need to check `channel` even buffer 
remaining size is 0



##########
java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java:
##########
@@ -20,30 +20,130 @@
 package org.apache.fury.io;
 
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.fury.exception.DeserializationException;
 import org.apache.fury.memory.MemoryBuffer;
+import org.apache.fury.util.Platform;
+import org.apache.fury.util.Preconditions;
 
-// TODO support zero-copy channel reading.
-public class FuryReadableChannel extends AbstractStreamReader implements 
ReadableByteChannel {
+@NotThreadSafe
+public class FuryReadableChannel implements FuryStreamReader, 
ReadableByteChannel {
   private final ReadableByteChannel channel;
-  private final ByteBuffer byteBuffer;
-  private final MemoryBuffer buffer;
+  private final MemoryBuffer memoryBuffer;
+  private ByteBuffer byteBuffer;
 
   public FuryReadableChannel(ReadableByteChannel channel) {
-    this(channel, ByteBuffer.allocate(4096));
+    this(channel, ByteBuffer.allocateDirect(4096));
   }
 
-  private FuryReadableChannel(ReadableByteChannel channel, ByteBuffer 
directBuffer) {
+  public FuryReadableChannel(ReadableByteChannel channel, ByteBuffer 
directBuffer) {
+    Preconditions.checkArgument(
+        directBuffer.isDirect(), "FuryReadableChannel support only direct 
ByteBuffer.");
     this.channel = channel;
     this.byteBuffer = directBuffer;
-    this.buffer = MemoryBuffer.fromByteBuffer(directBuffer);
+
+    long offHeapAddress = Platform.getAddress(directBuffer) + 
directBuffer.position();
+    this.memoryBuffer = new MemoryBuffer(offHeapAddress, 0, directBuffer, 
this);
+  }
+
+  @Override
+  public int fillBuffer(int minFillSize) {
+    try {
+      ByteBuffer byteBuf = byteBuffer;
+      MemoryBuffer memoryBuf = memoryBuffer;
+      int position = byteBuf.position();
+      int newLimit = position + minFillSize;
+      if (newLimit > byteBuf.capacity()) {
+        int newSize =
+            newLimit < BUFFER_GROW_STEP_THRESHOLD ? newLimit << 2 : (int) 
(newLimit * 1.5);
+        ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newSize);
+        byteBuf.position(0);
+        newByteBuf.put(byteBuf);
+        byteBuf = byteBuffer = newByteBuf;
+        memoryBuf.initDirectBuffer(Platform.getAddress(byteBuf), position, 
byteBuf);
+      }
+      byteBuf.limit(newLimit);
+      int readCount = channel.read(byteBuf);
+      memoryBuf.increaseSize(readCount);
+      return readCount;
+    } catch (IOException e) {
+      throw new DeserializationException("Failed to read the provided byte 
channel", e);
+    }
   }
 
   @Override
   public int read(ByteBuffer dst) throws IOException {
-    throw new UnsupportedEncodingException();
+    int dstRemaining = dst.remaining();
+    if (dstRemaining <= 0) {
+      return 0;
+    }
+    MemoryBuffer buf = memoryBuffer;
+    int remaining = buf.remaining();
+    if (remaining <= 0) {
+      return -1;
+    }
+    if (remaining >= dstRemaining) {
+      byte[] bytes = buf.readBytes(dstRemaining);
+      dst.put(bytes);
+      return dstRemaining;
+    } else {
+      int filledSize = fillBuffer(dstRemaining - remaining);
+      int length = remaining + filledSize;
+      byte[] bytes = buf.readBytes(length);
+      dst.put(bytes);
+      return length;

Review Comment:
   ```suggestion
       MemoryBuffer buf = buffer;
       int remaining = buf.remaining();
       int len = dst.remaining();
       if (remaining >= len) {
         buf.read(dst, len);
         return len;
       } else {
         try {
           buf.read(dst, remaining);
           return channel.read(dst) + remaining;
         } catch (IOException e) {
           throw new RuntimeException(e);
         }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to