This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new d37fe69  HDDS-5552. EC: Implement seek on ECBlockInputStream (#2723)
d37fe69 is described below

commit d37fe6901b579f0c6f7c433e954a1e5b38ed32df
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Wed Oct 13 17:32:34 2021 +0100

    HDDS-5552. EC: Implement seek on ECBlockInputStream (#2723)
---
 .../hdds/scm/storage/ExtendedInputStream.java      |  2 +-
 .../hadoop/ozone/client/io/ECBlockInputStream.java | 45 +++++++++++---
 .../client/rpc/read/TestECBlockInputStream.java    | 71 +++++++++++++++++++++-
 .../ozone/client/rpc/read/TestInputStreamBase.java | 23 +++++--
 .../ozone/client/rpc/read/TestKeyInputStream.java  | 52 ++++++++++++++++
 5 files changed, 180 insertions(+), 13 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ExtendedInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ExtendedInputStream.java
index d09afe1..de868c7 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ExtendedInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ExtendedInputStream.java
@@ -81,7 +81,7 @@ public abstract class ExtendedInputStream extends InputStream
 
   @Override
   public synchronized void seek(long l) throws IOException {
-    throw new NotImplementedException("Seek is not implements for EC");
+    throw new NotImplementedException("Seek is not implemented");
   }
 
   @Override
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index a84452d..ae030b5 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.ozone.client.io;
 
 import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.NotImplementedException;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
@@ -34,6 +33,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.function.Function;
@@ -48,6 +48,7 @@ public class ECBlockInputStream extends 
BlockExtendedInputStream {
 
   private final ECReplicationConfig repConfig;
   private final int ecChunkSize;
+  private final long stripeSize;
   private final BlockInputStreamFactory streamFactory;
   private final boolean verifyChecksum;
   private final XceiverClientFactory xceiverClientFactory;
@@ -60,6 +61,7 @@ public class ECBlockInputStream extends 
BlockExtendedInputStream {
 
   private long position = 0;
   private boolean closed = false;
+  private boolean seeked = false;
 
   public ECBlockInputStream(ECReplicationConfig repConfig,
       OmKeyLocationInfo blockInfo, boolean verifyChecksum,
@@ -77,6 +79,7 @@ public class ECBlockInputStream extends 
BlockExtendedInputStream {
     this.parityLocations = new DatanodeDetails[repConfig.getParity()];
     this.blockStreams = new BlockExtendedInputStream[repConfig.getData()];
 
+    stripeSize = ecChunkSize * repConfig.getData();
     setBlockLocations(this.blockInfo.getPipeline());
   }
 
@@ -158,8 +161,7 @@ public class ECBlockInputStream extends 
BlockExtendedInputStream {
    * @return
    */
   private long internalBlockLength(int index) {
-    long lastStripe = blockInfo.getLength()
-        % ((long)ecChunkSize * repConfig.getData());
+    long lastStripe = blockInfo.getLength() % stripeSize;
     long blockSize = (blockInfo.getLength() - lastStripe) / 
repConfig.getData();
     long lastCell = lastStripe / ecChunkSize + 1;
     long lastCellLength = lastStripe % ecChunkSize;
@@ -255,8 +257,23 @@ public class ECBlockInputStream extends 
BlockExtendedInputStream {
   private int readFromStream(BlockExtendedInputStream stream,
       ByteReaderStrategy strategy)
       throws IOException {
-    // Number of bytes left to read from this streams EC cell.
-    long ecLimit = ecChunkSize - (position % ecChunkSize);
+    long partialPosition = position % ecChunkSize;
+    if (seeked) {
+      // Seek on the underlying streams is performed lazily, as there is a
+      // possibility a read after a seek may only read a small amount of data.
+      // Once this block stream has been seeked, we always check the position,
+      // but in the usual case, where there are no seeks at all, we don't need
+      // to do this extra work.
+      long basePosition = (position / stripeSize) * (long)ecChunkSize;
+      long streamPosition = basePosition + partialPosition;
+      if (streamPosition != stream.getPos()) {
+        // This ECBlockInputStream has been seeked, so the underlying
+        // block stream is no longer at the correct position. Therefore we need
+        // to seek it too.
+        stream.seek(streamPosition);
+      }
+    }
+    long ecLimit = ecChunkSize - partialPosition;
     // Free space in the buffer to read into
     long bufLimit = strategy.getTargetLength();
     // How much we can read, the lower of the EC Cell, buffer and overall block
@@ -309,8 +326,22 @@ public class ECBlockInputStream extends 
BlockExtendedInputStream {
   }
 
   @Override
-  public synchronized void seek(long l) throws IOException {
-    throw new NotImplementedException("Seek is not implements for EC");
+  public synchronized void seek(long pos) throws IOException {
+    checkOpen();
+    if (pos < 0 || pos >= getLength()) {
+      if (pos == 0) {
+        // It is possible for length and pos to be zero in which case
+        // seek should return instead of throwing exception
+        return;
+      }
+      throw new EOFException(
+          "EOF encountered at pos: " + pos + " for block: "
+              + blockInfo.getBlockID());
+    }
+    if (position != pos) {
+      position = pos;
+      seeked = true;
+    }
   }
 
   @Override
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
index 647483a..aa22a0b 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
@@ -36,6 +36,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -281,6 +282,69 @@ public class TestECBlockInputStream {
     }
   }
 
+  @Test(expected = EOFException.class)
+  public void testSeekPastBlockLength() throws IOException {
+    repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+        ONEMB);
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 100);
+    try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
+        keyInfo, true, null, null, streamFactory)) {
+      ecb.seek(1000);
+    }
+  }
+
+  @Test(expected = EOFException.class)
+  public void testSeekToLength() throws IOException {
+    repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+        ONEMB);
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 100);
+    try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
+        keyInfo, true, null, null, streamFactory)) {
+      ecb.seek(100);
+    }
+  }
+
+  @Test
+  public void testSeekToLengthZeroLengthBlock() throws IOException {
+    repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+        ONEMB);
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 0);
+    try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
+        keyInfo, true, null, null, streamFactory)) {
+      ecb.seek(0);
+      Assert.assertEquals(0, ecb.getPos());
+      Assert.assertEquals(0, ecb.getRemaining());
+    }
+  }
+
+  @Test
+  public void testSeekToValidPosition() throws IOException {
+    repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+        ONEMB);
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
+        keyInfo, true, null, null, streamFactory)) {
+      ecb.seek(ONEMB - 1);
+      Assert.assertEquals(ONEMB - 1, ecb.getPos());
+      Assert.assertEquals(ONEMB * 4 + 1, ecb.getRemaining());
+      // First read should read the last byte of the first chunk
+      Assert.assertEquals(0, ecb.read());
+      Assert.assertEquals(ONEMB,
+          streamFactory.getBlockStreams().get(0).position);
+      // Second read should be the first byte of the second chunk.
+      Assert.assertEquals(1, ecb.read());
+
+      // Seek to the end of the file minus one byte
+      ecb.seek(ONEMB * 5 - 1);
+      Assert.assertEquals(1, ecb.read());
+      Assert.assertEquals(ONEMB * 2,
+          streamFactory.getBlockStreams().get(1).position);
+      // Second read should be EOF as there is no data left
+      Assert.assertEquals(-1, ecb.read());
+      Assert.assertEquals(0, ecb.getRemaining());
+    }
+  }
+
   private void validateBufferContents(ByteBuffer buf, int from, int to,
       byte val) {
     for (int i=from; i<to; i++){
@@ -412,8 +476,13 @@ public class TestECBlockInputStream {
     }
 
     @Override
+    public void seek(long pos) {
+      this.position = pos;
+    }
+
+    @Override
     public long getPos() {
-      return 0;
+      return position;
     }
   }
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
index 7a23715..dfd2a64 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
@@ -23,7 +23,8 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.StorageUnit;
-import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -48,6 +49,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
@@ -113,7 +115,7 @@ public abstract class TestInputStreamBase {
     conf.setFromObject(repConf);
 
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(4)
+        .setNumDatanodes(5)
         .setTotalPipelineNumLimit(5)
         .setBlockSize(BLOCK_SIZE)
         .setChunkSize(CHUNK_SIZE)
@@ -168,8 +170,14 @@ public abstract class TestInputStreamBase {
   }
 
   byte[] writeKey(String keyName, int dataLength) throws Exception {
+    ReplicationConfig repConfig = new RatisReplicationConfig(THREE);
+    return writeKey(keyName, repConfig, dataLength);
+  }
+
+  byte[] writeKey(String keyName, ReplicationConfig repConfig, int dataLength)
+      throws Exception {
     OzoneOutputStream key = TestHelper.createKey(keyName,
-        ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
+        repConfig, 0, objectStore, volumeName, bucketName);
 
     byte[] inputData = ContainerTestHelper.getFixedLengthString(
         keyString, dataLength).getBytes(UTF_8);
@@ -181,8 +189,15 @@ public abstract class TestInputStreamBase {
 
   byte[] writeRandomBytes(String keyName, int dataLength)
       throws Exception {
+    ReplicationConfig repConfig = new RatisReplicationConfig(THREE);
+    return writeRandomBytes(keyName, repConfig, dataLength);
+  }
+
+  byte[] writeRandomBytes(String keyName, ReplicationConfig repConfig,
+      int dataLength)
+      throws Exception {
     OzoneOutputStream key = TestHelper.createKey(keyName,
-        ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
+        repConfig, 0, objectStore, volumeName, bucketName);
 
     byte[] inputData = new byte[dataLength];
     RAND.nextBytes(inputData);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStream.java
index 89e78e9..4946fee 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStream.java
@@ -23,8 +23,10 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -51,6 +53,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.hdds.client.ECReplicationConfig.EcCodec.RS;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 import static org.apache.hadoop.ozone.container.TestHelper.countReplicas;
 import static org.junit.Assert.fail;
@@ -96,6 +99,24 @@ public class TestKeyInputStream extends TestInputStreamBase {
   }
 
   /**
+   * This method does random seeks and reads and validates the reads are
+   * correct or not.
+   * @param dataLength
+   * @param keyInputStream
+   * @param inputData
+   * @param readSize
+   * @throws Exception
+   */
+  private void randomPositionSeek(int dataLength, KeyInputStream 
keyInputStream,
+      byte[] inputData, int readSize) throws Exception {
+    Random rand = new Random();
+    for (int i = 0; i < 100; i++) {
+      int position = rand.nextInt(dataLength - readSize);
+      validate(keyInputStream, inputData, position, readSize);
+    }
+  }
+
+  /**
    * This method seeks to specified seek value and read the data specified by
    * readLength and validate the read is correct or not.
    * @param keyInputStream
@@ -126,6 +147,7 @@ public class TestKeyInputStream extends TestInputStreamBase 
{
     testReadChunkWithByteArray();
     testReadChunkWithByteBuffer();
     testSkip();
+    testECSeek();
   }
 
   public void testInputStreams() throws Exception {
@@ -198,6 +220,36 @@ public class TestKeyInputStream extends 
TestInputStreamBase {
     keyInputStream.close();
   }
 
+  public void testECSeek() throws Exception {
+    int ecChunkSize = 1024 * 1024;
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2, RS,
+        ecChunkSize);
+    String keyName = getNewKeyName();
+    // 3 full EC blocks plus one chunk
+    int dataLength = (9 * BLOCK_SIZE + ecChunkSize);
+
+    byte[] inputData = writeRandomBytes(keyName, repConfig, dataLength);
+    try(KeyInputStream keyInputStream = getKeyInputStream(keyName)) {
+
+      validate(keyInputStream, inputData, 0, ecChunkSize + 1234);
+
+      validate(keyInputStream, inputData, 200, ecChunkSize);
+
+      validate(keyInputStream, inputData, BLOCK_SIZE * 4, ecChunkSize);
+
+      validate(keyInputStream, inputData, BLOCK_SIZE * 4 + 200, ecChunkSize);
+
+      validate(keyInputStream, inputData, dataLength - ecChunkSize - 100,
+          ecChunkSize + 50);
+
+      randomPositionSeek(dataLength, keyInputStream, inputData,
+          ecChunkSize + 200);
+
+      // Read entire key.
+      validate(keyInputStream, inputData, 0, dataLength);
+    }
+  }
+
   public void testSeek() throws Exception {
     XceiverClientManager.resetXceiverClientMetrics();
     XceiverClientMetrics metrics = XceiverClientManager

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to