This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new bad3e166aa3 HDDS-14035. StreamRead: Positioned-read should not do 
pre-read. (#9425)
bad3e166aa3 is described below

commit bad3e166aa3e4e8d07526a8ccba0cefeab5119e9
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Dec 4 10:14:24 2025 -0800

    HDDS-14035. StreamRead: Positioned-read should not do pre-read. (#9425)
---
 .../hadoop/hdds/scm/storage/ByteBufferReader.java  |  21 ++--
 .../hdds/scm/storage/ExtendedInputStream.java      |  26 +++--
 .../hdds/scm/storage/MultipartInputStream.java     |  39 ++++++-
 .../hdds/scm/storage/StreamBlockInputStream.java   | 120 ++++++++++++++-------
 .../rpc/read/TestStreamBlockInputStream.java       |  55 ++++++++++
 .../ozone/client/rpc/read/TestStreamRead.java      |  24 ++---
 .../apache/hadoop/fs/ozone/OzoneFSInputStream.java |   8 ++
 7 files changed, 220 insertions(+), 73 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferReader.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferReader.java
index f4b493144e5..387126028b1 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferReader.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferReader.java
@@ -17,11 +17,12 @@
 
 package org.apache.hadoop.hdds.scm.storage;
 
-import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.util.Objects;
 import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.ratis.util.Preconditions;
 
 /**
  * An {@link ByteReaderStrategy} implementation which supports ByteBuffer as 
the
@@ -32,18 +33,22 @@ public class ByteBufferReader implements ByteReaderStrategy 
{
   private int targetLen;
 
   public ByteBufferReader(ByteBuffer buf) {
-    if (buf == null) {
-      throw new NullPointerException();
-    }
-    this.readBuf = buf;
+    this.readBuf = Objects.requireNonNull(buf, "buf == null");
     this.targetLen = buf.remaining();
   }
 
+  ByteBuffer getBuffer() {
+    return readBuf;
+  }
+
+  int readImpl(InputStream inputStream) throws IOException {
+    return Preconditions.assertInstanceOf(inputStream, 
ByteBufferReadable.class)
+        .read(readBuf);
+  }
+
   @Override
   public int readFromBlock(InputStream is, int numBytesToRead) throws
       IOException {
-    Preconditions.checkArgument(is != null);
-    Preconditions.checkArgument(is instanceof ByteBufferReadable);
     // change buffer limit
     int bufferLimit = readBuf.limit();
     if (numBytesToRead < targetLen) {
@@ -51,7 +56,7 @@ public int readFromBlock(InputStream is, int numBytesToRead) 
throws
     }
     int numBytesRead;
     try {
-      numBytesRead = ((ByteBufferReadable)is).read(readBuf);
+      numBytesRead = readImpl(is);
     } finally {
       // restore buffer limit
       if (numBytesToRead < targetLen) {
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ExtendedInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ExtendedInputStream.java
index 0ff7f1fdd78..75e483ad55e 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ExtendedInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ExtendedInputStream.java
@@ -36,6 +36,17 @@ public abstract class ExtendedInputStream extends InputStream
 
   protected static final int EOF = -1;
 
+  /**
+   * Positioned read.
+   *
+   * @param position the starting position of the read.
+   * @param buffer the buffer for storing the data.
+   * @return true iff positioned read is supported in this implementation.
+   */
+  public boolean readFully(long position, ByteBuffer buffer) throws 
IOException {
+    return false;
+  }
+
   @Override
   public synchronized int read() throws IOException {
     byte[] buf = new byte[1];
@@ -47,19 +58,16 @@ public synchronized int read() throws IOException {
 
   @Override
   public synchronized int read(byte[] b, int off, int len) throws IOException {
-    ByteReaderStrategy strategy = new ByteArrayReader(b, off, len);
-    int bufferLen = strategy.getTargetLength();
-    if (bufferLen == 0) {
-      return 0;
-    }
-    return readWithStrategy(strategy);
+    return read(new ByteArrayReader(b, off, len));
   }
 
   @Override
   public synchronized int read(ByteBuffer byteBuffer) throws IOException {
-    ByteReaderStrategy strategy = new ByteBufferReader(byteBuffer);
-    int bufferLen = strategy.getTargetLength();
-    if (bufferLen == 0) {
+    return read(new ByteBufferReader(byteBuffer));
+  }
+
+  public synchronized int read(ByteReaderStrategy strategy) throws IOException 
{
+    if (strategy.getTargetLength() == 0) {
       return 0;
     }
     return readWithStrategy(strategy);
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java
index e48b704aade..a28658b1ebb 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java
@@ -18,13 +18,16 @@
 package org.apache.hadoop.hdds.scm.storage;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.ratis.util.Preconditions;
 
 /**
  * A stream for accessing multipart streams.
@@ -36,6 +39,7 @@ public class MultipartInputStream extends ExtendedInputStream 
{
 
   // List of PartInputStream, one for each part of the key
   private final List<? extends PartInputStream> partStreams;
+  private final boolean isStreamBlockInputStream;
 
   // partOffsets[i] stores the index of the first data byte in
   // partStream w.r.t the whole key data.
@@ -58,11 +62,11 @@ public class MultipartInputStream extends 
ExtendedInputStream {
 
   public MultipartInputStream(String keyName,
                               List<? extends PartInputStream> inputStreams) {
-
-    Preconditions.checkNotNull(inputStreams);
+    Objects.requireNonNull(inputStreams, "inputStreams == null");
 
     this.key = keyName;
-    this.partStreams = inputStreams;
+    this.partStreams = Collections.unmodifiableList(inputStreams);
+    this.isStreamBlockInputStream = !inputStreams.isEmpty() && 
inputStreams.get(0) instanceof StreamBlockInputStream;
 
     // Calculate and update the partOffsets
     this.partOffsets = new long[inputStreams.size()];
@@ -70,6 +74,9 @@ public MultipartInputStream(String keyName,
     long streamLength = 0L;
     for (PartInputStream partInputStream : inputStreams) {
       this.partOffsets[i++] = streamLength;
+      if (isStreamBlockInputStream) {
+        Preconditions.assertInstanceOf(partInputStream, 
StreamBlockInputStream.class);
+      }
       streamLength += partInputStream.getLength();
     }
     this.length = streamLength;
@@ -78,7 +85,7 @@ public MultipartInputStream(String keyName,
   @Override
   protected synchronized int readWithStrategy(ByteReaderStrategy strategy)
       throws IOException {
-    Preconditions.checkArgument(strategy != null);
+    Objects.requireNonNull(strategy, "strategy == null");
     checkOpen();
 
     int totalReadLen = 0;
@@ -176,6 +183,28 @@ public synchronized void seek(long pos) throws IOException 
{
     prevPartIndex = partIndex;
   }
 
+  @Override
+  public boolean readFully(long position, ByteBuffer buffer) throws 
IOException {
+    if (!isStreamBlockInputStream) {
+      return false;
+    }
+
+    final long oldPos = getPos();
+    seek(position);
+    try {
+      read(new ByteBufferReader(buffer) {
+        @Override
+        int readImpl(InputStream inputStream) throws IOException {
+          return Preconditions.assertInstanceOf(inputStream, 
StreamBlockInputStream.class)
+              .readFully(getBuffer(), false);
+        }
+      });
+    } finally {
+      seek(oldPos);
+    }
+    return true;
+  }
+
   public synchronized void initialize() throws IOException {
     // Pre-check that the stream has not been intialized already
     if (initialized) {
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
index 0d1e8ff1003..72e5d11edc4 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
@@ -19,6 +19,7 @@
 
 import static org.apache.ratis.thirdparty.io.grpc.Status.Code.CANCELLED;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.BlockingQueue;
@@ -27,13 +28,11 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import org.apache.commons.lang3.NotImplementedException;
-import org.apache.hadoop.fs.ByteBufferReadable;
-import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -51,6 +50,7 @@
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.ChecksumData;
 import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
 import org.apache.ratis.util.Preconditions;
@@ -61,11 +61,13 @@
  * An {@link java.io.InputStream} called from KeyInputStream to read a block 
from the
  * container.
  */
-public class StreamBlockInputStream extends BlockExtendedInputStream
-    implements Seekable, CanUnbuffer, ByteBufferReadable {
+public class StreamBlockInputStream extends BlockExtendedInputStream {
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamBlockInputStream.class);
   private static final int EOF = -1;
+  private static final AtomicInteger STREAM_ID = new AtomicInteger(0);
+  private static final AtomicInteger READER_ID = new AtomicInteger(0);
 
+  private final String name = "stream" + STREAM_ID.getAndIncrement();
   private final BlockID blockID;
   private final long blockLength;
   private final int responseDataSize = 1 << 20; // 1 MB
@@ -119,7 +121,7 @@ public synchronized long getPos() {
   @Override
   public synchronized int read() throws IOException {
     checkOpen();
-    if (!dataAvailableToRead(1)) {
+    if (!dataAvailableToRead(1, true)) {
       return EOF;
     }
     position++;
@@ -134,10 +136,14 @@ public synchronized int read(byte[] b, int off, int len) 
throws IOException {
 
   @Override
   public synchronized int read(ByteBuffer targetBuf) throws IOException {
+    return readFully(targetBuf, true);
+  }
+
+  synchronized int readFully(ByteBuffer targetBuf, boolean preRead) throws 
IOException {
     checkOpen();
     int read = 0;
     while (targetBuf.hasRemaining()) {
-      if (!dataAvailableToRead(targetBuf.remaining())) {
+      if (!dataAvailableToRead(targetBuf.remaining(), preRead)) {
         break;
       }
       int toCopy = Math.min(buffer.remaining(), targetBuf.remaining());
@@ -151,7 +157,7 @@ public synchronized int read(ByteBuffer targetBuf) throws 
IOException {
     return read > 0 ? read : EOF;
   }
 
-  private synchronized boolean dataAvailableToRead(int length) throws 
IOException {
+  private synchronized boolean dataAvailableToRead(int length, boolean 
preRead) throws IOException {
     if (position >= blockLength) {
       return false;
     }
@@ -160,7 +166,7 @@ private synchronized boolean dataAvailableToRead(int 
length) throws IOException
     if (bufferHasRemaining()) {
       return true;
     }
-    buffer = streamingReader.read(length);
+    buffer = streamingReader.read(length, preRead);
     return bufferHasRemaining();
   }
 
@@ -180,11 +186,12 @@ public synchronized void seek(long pos) throws 
IOException {
       throw new IOException("Cannot seek to negative offset");
     }
     if (pos > blockLength) {
-      throw new IOException("Cannot seek after the end of the block");
+      throw new EOFException("Failed to seek to position " + pos + " > block 
length = " + blockLength);
     }
     if (pos == position) {
       return;
     }
+    LOG.debug("{}: seek {} -> {}", this, position, pos);
     closeStream();
     position = pos;
     requestedLength = pos;
@@ -204,6 +211,7 @@ public synchronized void unbuffer() {
 
   private synchronized void closeStream() {
     if (streamingReader != null) {
+      LOG.debug("Closing {}", streamingReader);
       streamingReader.onCompleted();
       streamingReader = null;
     }
@@ -252,13 +260,14 @@ private synchronized void initialize() throws IOException 
{
     }
   }
 
-  synchronized void readBlock(int length) throws IOException {
+  synchronized void readBlock(int length, boolean preRead) throws IOException {
     final long required = position + length - requestedLength;
-    final long readLength = required + preReadSize;
+    final long preReadLength = preRead ? preReadSize : 0;
+    final long readLength = required + preReadLength;
 
     if (readLength > 0) {
       LOG.debug("position {}, length {}, requested {}, diff {}, readLength {}, 
preReadSize={}",
-          position, length, requestedLength, required, readLength, 
preReadSize);
+          position, length, requestedLength, required, readLength, 
preReadLength);
       readBlockImpl(readLength);
       requestedLength += readLength;
     }
@@ -314,10 +323,16 @@ private synchronized void releaseStreamResources() {
     }
   }
 
+  @Override
+  public String toString() {
+    return name;
+  }
+
   /**
    * Implementation of a StreamObserver used to received and buffer streaming 
GRPC reads.
    */
   public class StreamingReader implements StreamingReaderSpi {
+    private final String name = StreamBlockInputStream.this.name + "-reader" + 
READER_ID.getAndIncrement();
 
     /** Response queue: poll is blocking while offer is non-blocking. */
     private final BlockingQueue<ReadBlockResponseProto> responseQueue = new 
LinkedBlockingQueue<>();
@@ -336,7 +351,11 @@ void checkError() throws IOException {
       }
     }
 
-    ReadBlockResponseProto poll() throws IOException {
+    ReadBlockResponseProto poll(int timeout, TimeUnit timeoutUnit) throws 
IOException {
+      final long timeoutNanos = timeoutUnit.toNanos(timeout);
+      final long startTime = System.nanoTime();
+      final long pollTimeoutNanos = Math.min(timeoutNanos / 10, 100_000_000);
+
       while (true) {
         checkError();
         if (future.isDone()) {
@@ -345,7 +364,7 @@ ReadBlockResponseProto poll() throws IOException {
 
         final ReadBlockResponseProto proto;
         try {
-          proto = responseQueue.poll(1, TimeUnit.SECONDS);
+          proto = responseQueue.poll(pollTimeoutNanos, TimeUnit.NANOSECONDS);
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           throw new IOException("Interrupted while waiting for response", e);
@@ -353,46 +372,53 @@ ReadBlockResponseProto poll() throws IOException {
         if (proto != null) {
           return proto;
         }
+
+        final long elapsedNanos = System.nanoTime() - startTime;
+        if (elapsedNanos >= timeoutNanos) {
+          setFailedAndThrow(new TimeoutIOException(
+              "Timed out " + timeout + " " + timeoutUnit + " waiting for 
response"));
+          return null;
+        }
       }
     }
 
-    private ByteBuffer read(int length) throws IOException {
+    private ByteBuffer read(int length, boolean preRead) throws IOException {
       checkError();
       if (future.isDone()) {
         return null; // Stream ended
       }
 
-      readBlock(length);
+      readBlock(length, preRead);
 
       while (true) {
         final ByteBuffer buf = readFromQueue();
-        if (buf.hasRemaining()) {
+        if (buf != null && buf.hasRemaining()) {
           return buf;
         }
       }
     }
 
     ByteBuffer readFromQueue() throws IOException {
-      final ReadBlockResponseProto readBlock = poll();
+      final ReadBlockResponseProto readBlock = poll(10, TimeUnit.SECONDS);
       // The server always returns data starting from the last checksum 
boundary. Therefore if the reader position is
       // ahead of the position we received from the server, we need to adjust 
the buffer position accordingly.
       // If the reader position is behind
-      ByteBuffer buf = readBlock.getData().asReadOnlyByteBuffer();
-      long blockOffset = readBlock.getOffset();
-      long pos = getPos();
+      final ByteString data = readBlock.getData();
+      final ByteBuffer dataBuffer = data.asReadOnlyByteBuffer();
+      final long blockOffset = readBlock.getOffset();
+      final long pos = getPos();
       if (pos < blockOffset) {
         // This should not happen, and if it does, we have a bug.
-        throw new IOException("Received data out of order. Position is " + pos 
+ " but received data at "
-            + blockOffset);
+        setFailedAndThrow(new IllegalStateException(
+            this + ": out of order, position " + pos + " < block offset " + 
blockOffset));
       }
-      if (pos > readBlock.getOffset()) {
-        int offset = (int)(pos - readBlock.getOffset());
-        if (offset > buf.limit()) {
-          offset = buf.limit();
-        }
-        buf.position(offset);
+      final long offset = pos - blockOffset;
+      if (offset > 0) {
+        dataBuffer.position(Math.toIntExact(Math.min(offset, 
dataBuffer.limit())));
       }
-      return buf;
+      LOG.debug("{}: return response positon {}, length {} (block offset {}, 
length {})",
+          name, pos, dataBuffer.remaining(), blockOffset, data.size());
+      return dataBuffer;
     }
 
     private void releaseResources() {
@@ -448,20 +474,32 @@ StreamingReadResponse getResponse() {
       return response.get();
     }
 
-    private void setFailed(Throwable throwable) {
+    private <T extends Throwable> void setFailedAndThrow(T throwable) throws T 
{
+      if (setFailed(throwable)) {
+        throw throwable;
+      }
+    }
+
+    private boolean setFailed(Throwable throwable) {
       final boolean completed = future.completeExceptionally(throwable);
       if (!completed) {
-        LOG.warn("Already failed: suppressed ", throwable);
+        LOG.warn("{}: Already completed, suppress ", this, throwable);
       }
+      return completed;
     }
 
     private void setCompleted() {
       final boolean changed = future.complete(null);
-      if (!changed) {
+      if (changed) {
+        LOG.debug("{} setCompleted success", this);
+      } else {
         try {
           future.get();
-        } catch (InterruptedException | ExecutionException e) {
-          LOG.warn("Failed to setCompleted", e);
+          LOG.debug("{} Failed to setCompleted: Already completed", this);
+        } catch (InterruptedException e) {
+          LOG.warn("{}: Interrupted setCompleted", this, e);
+        } catch (ExecutionException e) {
+          LOG.warn("{}: Failed to setCompleted: already completed 
exceptionally", this, e);
         }
       }
 
@@ -471,8 +509,9 @@ private void setCompleted() {
     private void offerToQueue(ReadBlockResponseProto item) {
       if (LOG.isDebugEnabled()) {
         final ContainerProtos.ChecksumData checksumData = 
item.getChecksumData();
-        LOG.debug("offerToQueue {} bytes, numChecksums {}, 
bytesPerChecksum={}",
-            item.getData().size(), checksumData.getChecksumsList().size(), 
checksumData.getBytesPerChecksum());
+        LOG.debug("{}: enqueue response offset {}, length {}, numChecksums {}, 
bytesPerChecksum={}",
+            name, item.getOffset(), item.getData().size(),
+            checksumData.getChecksumsList().size(), 
checksumData.getBytesPerChecksum());
       }
       final boolean offered = responseQueue.offer(item);
       Preconditions.assertTrue(offered, () -> "Failed to offer " + item);
@@ -483,5 +522,10 @@ public void setStreamingReadResponse(StreamingReadResponse 
streamingReadResponse
       final boolean set = response.compareAndSet(null, streamingReadResponse);
       Preconditions.assertTrue(set, () -> "Failed to set 
streamingReadResponse");
     }
+
+    @Override
+    public String toString() {
+      return name;
+    }
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
index ad8929bdfb8..44b753210d9 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
@@ -56,7 +56,13 @@ public class TestStreamBlockInputStream extends 
TestInputStreamBase {
     
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ozone.container.common"),
 Level.ERROR);
     
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ozone.om"),
 Level.ERROR);
     GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.ratis"), 
Level.ERROR);
+    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("BackgroundPipelineScrubber"),
 Level.ERROR);
+    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("ExpiredContainerReplicaOpScrubber"),
 Level.ERROR);
+    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("SCMHATransactionMonitor"),
 Level.ERROR);
     GenericTestUtils.setLogLevel(GrpcXceiverService.class, Level.ERROR);
+
+//    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger(StreamBlockInputStream.class),
 Level.TRACE);
+//    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger(XceiverClientGrpc.class), 
Level.TRACE);
   }
 
   /**
@@ -94,6 +100,8 @@ void runTestReadKey(int keyLength, boolean randomReadOffset, 
OzoneConfiguration
       LOG.info("---------------------------------------------------------");
       LOG.info("writeRandomBytes {} bytes", inputData.length);
 
+      runTestPositionedRead(keyName, ByteBuffer.wrap(new 
byte[inputData.length]));
+
       for (int i = 1; i <= 10; i++) {
         runTestReadKey(keyName, keyLength / i, randomReadOffset, keyLength);
       }
@@ -129,6 +137,53 @@ private void runTestReadKey(String key, int bufferSize, 
boolean randomReadOffset
     }
   }
 
+  void runTestPositionedRead(String key, ByteBuffer buffer) throws Exception {
+    try (KeyInputStream in = bucket.getKeyInputStream(key)) {
+      runTestPositionedRead(buffer, in, 0, 0);
+      runTestPositionedRead(buffer, in, 0, 1);
+      runTestPositionedRead(buffer, in, inputData.length, 0);
+      runTestPositionedRead(buffer, in, inputData.length - 1, 1);
+      for (int i = 0; i < 5; i++) {
+        runTestPositionedRead(buffer, in);
+      }
+    }
+  }
+
+  void runTestPositionedRead(ByteBuffer buffer, KeyInputStream in) throws 
Exception {
+    final int position = ThreadLocalRandom.current().nextInt(inputData.length 
- 1);
+    runTestPositionedRead(buffer, in, position, 0);
+    runTestPositionedRead(buffer, in, position, 1);
+    final int n = 2 + ThreadLocalRandom.current().nextInt(inputData.length - 1 
- position);
+    runTestPositionedRead(buffer, in, position, n);
+  }
+
+  void runTestPositionedRead(ByteBuffer buffer, KeyInputStream in, int pos, 
int length) throws Exception {
+    LOG.info("runTestPositionedRead: position={}, length={}", pos, length);
+    assertTrue(pos + length <= inputData.length);
+    buffer = buffer.duplicate();
+
+    // seek and read
+    buffer.position(0).limit(length);
+    in.seek(pos);
+    while (buffer.hasRemaining()) {
+      in.read(buffer);
+    }
+    assertData(pos, length, buffer);
+
+    // positioned read
+    buffer.position(0).limit(length);
+    in.readFully(pos, buffer);
+    assertData(pos, length, buffer);
+  }
+
+  void assertData(int pos, int length, ByteBuffer buffer) {
+    buffer.flip();
+    assertEquals(length, buffer.remaining());
+    for (int i = 0; i < length; i++) {
+      assertEquals(inputData[pos + i], buffer.get(i), "pos=" + pos + ", i=" + 
i);
+    }
+  }
+
   @Test
   void testAll() throws Exception {
     try (MiniOzoneCluster cluster = newCluster()) {
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java
index 601246f0752..29a43876630 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java
@@ -17,7 +17,7 @@
 
 package org.apache.hadoop.ozone.client.rpc.read;
 
-import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.io.OutputStream;
@@ -50,8 +50,6 @@
  * Tests {@link StreamBlockInputStream}.
  */
 public class TestStreamRead {
-  private TestBucket bucket;
-
   {
     GenericTestUtils.setLogLevel(LoggerFactory.getLogger("com"), Level.ERROR);
     
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ipc"), 
Level.ERROR);
@@ -70,8 +68,8 @@ public class TestStreamRead {
   static final int FLUSH_SIZE = 2 * CHUNK_SIZE;       // 2MB
   static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE;   // 4MB
 
-  static final int BLOCK_SIZE = 128 << 20;
-  static final SizeInBytes KEY_SIZE = SizeInBytes.valueOf("256M");
+  static final int BLOCK_SIZE = 64 << 20;
+  static final SizeInBytes KEY_SIZE = SizeInBytes.valueOf("128M");
 
   static MiniOzoneCluster newCluster(int bytesPerChecksum) throws Exception {
     final OzoneConfiguration conf = new OzoneConfiguration();
@@ -93,7 +91,7 @@ static MiniOzoneCluster newCluster(int bytesPerChecksum) 
throws Exception {
         .applyTo(conf);
 
     return MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(3)
+        .setNumDatanodes(1)
         .build();
   }
 
@@ -127,7 +125,7 @@ void runTestReadKey(SizeInBytes keySize, SizeInBytes 
bytesPerChecksum) throws Ex
       OzoneConfiguration copy = new OzoneConfiguration(conf);
       copy.setFromObject(clientConfig);
 
-      final int n = 10;
+      final int n = 5;
       final SizeInBytes writeBufferSize = SizeInBytes.valueOf("8MB");
       final SizeInBytes[] readBufferSizes = {
           SizeInBytes.valueOf("32M"),
@@ -137,7 +135,7 @@ void runTestReadKey(SizeInBytes keySize, SizeInBytes 
bytesPerChecksum) throws Ex
       };
 
       try (OzoneClient client = OzoneClientFactory.getRpcClient(copy)) {
-        bucket = TestBucket.newBuilder(client).build();
+        final TestBucket bucket = TestBucket.newBuilder(client).build();
 
         for (int i = 0; i < n; i++) {
           final String keyName = "key" + i;
@@ -147,8 +145,8 @@ void runTestReadKey(SizeInBytes keySize, SizeInBytes 
bytesPerChecksum) throws Ex
 
           final String md5 = createKey(bucket.delegate(), keyName, keySize, 
writeBufferSize);
           for (SizeInBytes readBufferSize : readBufferSizes) {
-            runTestReadKey(keyName, keySize, readBufferSize, null);
-            runTestReadKey(keyName, keySize, readBufferSize, md5);
+            runTestReadKey(keyName, keySize, readBufferSize, null, bucket);
+            runTestReadKey(keyName, keySize, readBufferSize, md5, bucket);
           }
         }
       }
@@ -170,7 +168,7 @@ static String createKey(OzoneBucket bucket, String keyName, 
SizeInBytes keySize,
     final long keySizeByte = keySize.getSize();
     final long startTime = System.nanoTime();
     try (OutputStream stream = bucket.createStreamKey(keyName, keySizeByte,
-        RatisReplicationConfig.getInstance(THREE), Collections.emptyMap())) {
+        RatisReplicationConfig.getInstance(ONE), Collections.emptyMap())) {
       for (long pos = 0; pos < keySizeByte;) {
         final int writeSize = Math.toIntExact(Math.min(buffer.length, 
keySizeByte - pos));
         stream.write(buffer, 0, writeSize);
@@ -191,8 +189,8 @@ static String createKey(OzoneBucket bucket, String keyName, 
SizeInBytes keySize,
     return computedMD5;
   }
 
-  private void runTestReadKey(String keyName, SizeInBytes keySize, SizeInBytes 
bufferSize, String expectedMD5)
-      throws Exception {
+  private void runTestReadKey(String keyName, SizeInBytes keySize, SizeInBytes 
bufferSize, String expectedMD5,
+      TestBucket bucket) throws Exception {
     final long keySizeByte = keySize.getSize();
     final MessageDigest md5 = MessageDigest.getInstance("MD5");
     // Read the data fully into a large enough byte array
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java
index e4133ae57a5..e640c1e6d17 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java
@@ -30,6 +30,7 @@
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.annotation.InterfaceStability;
+import org.apache.hadoop.hdds.scm.storage.ExtendedInputStream;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 
 /**
@@ -168,6 +169,13 @@ public int read(long position, ByteBuffer buf) throws 
IOException {
     if (!buf.hasRemaining()) {
       return 0;
     }
+    if (inputStream instanceof ExtendedInputStream) {
+      final int remainingBeforeRead = buf.remaining();
+      if (((ExtendedInputStream) inputStream).readFully(position, buf)) {
+        return remainingBeforeRead - buf.remaining();
+      }
+    }
+
     long oldPos = this.getPos();
     int bytesRead;
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to