This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new d37fe69 HDDS-5552. EC: Implement seek on ECBlockInputStream (#2723)
d37fe69 is described below
commit d37fe6901b579f0c6f7c433e954a1e5b38ed32df
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Wed Oct 13 17:32:34 2021 +0100
HDDS-5552. EC: Implement seek on ECBlockInputStream (#2723)
---
.../hdds/scm/storage/ExtendedInputStream.java | 2 +-
.../hadoop/ozone/client/io/ECBlockInputStream.java | 45 +++++++++++---
.../client/rpc/read/TestECBlockInputStream.java | 71 +++++++++++++++++++++-
.../ozone/client/rpc/read/TestInputStreamBase.java | 23 +++++--
.../ozone/client/rpc/read/TestKeyInputStream.java | 52 ++++++++++++++++
5 files changed, 180 insertions(+), 13 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ExtendedInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ExtendedInputStream.java
index d09afe1..de868c7 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ExtendedInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ExtendedInputStream.java
@@ -81,7 +81,7 @@ public abstract class ExtendedInputStream extends InputStream
@Override
public synchronized void seek(long l) throws IOException {
- throw new NotImplementedException("Seek is not implements for EC");
+ throw new NotImplementedException("Seek is not implemented");
}
@Override
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index a84452d..ae030b5 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.client.io;
import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
@@ -34,6 +33,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
import java.util.function.Function;
@@ -48,6 +48,7 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
private final ECReplicationConfig repConfig;
private final int ecChunkSize;
+ private final long stripeSize;
private final BlockInputStreamFactory streamFactory;
private final boolean verifyChecksum;
private final XceiverClientFactory xceiverClientFactory;
@@ -60,6 +61,7 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
private long position = 0;
private boolean closed = false;
+ private boolean seeked = false;
public ECBlockInputStream(ECReplicationConfig repConfig,
OmKeyLocationInfo blockInfo, boolean verifyChecksum,
@@ -77,6 +79,7 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
this.parityLocations = new DatanodeDetails[repConfig.getParity()];
this.blockStreams = new BlockExtendedInputStream[repConfig.getData()];
+ stripeSize = ecChunkSize * repConfig.getData();
setBlockLocations(this.blockInfo.getPipeline());
}
@@ -158,8 +161,7 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
* @return
*/
private long internalBlockLength(int index) {
- long lastStripe = blockInfo.getLength()
- % ((long)ecChunkSize * repConfig.getData());
+ long lastStripe = blockInfo.getLength() % stripeSize;
long blockSize = (blockInfo.getLength() - lastStripe) /
repConfig.getData();
long lastCell = lastStripe / ecChunkSize + 1;
long lastCellLength = lastStripe % ecChunkSize;
@@ -255,8 +257,23 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
private int readFromStream(BlockExtendedInputStream stream,
ByteReaderStrategy strategy)
throws IOException {
- // Number of bytes left to read from this streams EC cell.
- long ecLimit = ecChunkSize - (position % ecChunkSize);
+ long partialPosition = position % ecChunkSize;
+ if (seeked) {
+ // Seek on the underlying streams is performed lazily, as there is a
+ // possibility a read after a seek may only read a small amount of data.
+ // Once this block stream has been seeked, we always check the position,
+ // but in the usual case, where there are no seeks at all, we don't need
+ // to do this extra work.
+ long basePosition = (position / stripeSize) * (long)ecChunkSize;
+ long streamPosition = basePosition + partialPosition;
+ if (streamPosition != stream.getPos()) {
+ // This ECBlockInputStream has been seeked, so the underlying
+ // block stream is no longer at the correct position. Therefore we need
+ // to seek it too.
+ stream.seek(streamPosition);
+ }
+ }
+ long ecLimit = ecChunkSize - partialPosition;
// Free space in the buffer to read into
long bufLimit = strategy.getTargetLength();
// How much we can read, the lower of the EC Cell, buffer and overall block
@@ -309,8 +326,22 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
}
@Override
- public synchronized void seek(long l) throws IOException {
- throw new NotImplementedException("Seek is not implements for EC");
+ public synchronized void seek(long pos) throws IOException {
+ checkOpen();
+ if (pos < 0 || pos >= getLength()) {
+ if (pos == 0) {
+ // It is possible for length and pos to be zero in which case
+ // seek should return instead of throwing exception
+ return;
+ }
+ throw new EOFException(
+ "EOF encountered at pos: " + pos + " for block: "
+ + blockInfo.getBlockID());
+ }
+ if (position != pos) {
+ position = pos;
+ seeked = true;
+ }
}
@Override
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
index 647483a..aa22a0b 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
@@ -36,6 +36,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -281,6 +282,69 @@ public class TestECBlockInputStream {
}
}
+ @Test(expected = EOFException.class)
+ public void testSeekPastBlockLength() throws IOException {
+ repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+ ONEMB);
+ OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 100);
+ try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
+ keyInfo, true, null, null, streamFactory)) {
+ ecb.seek(1000);
+ }
+ }
+
+ @Test(expected = EOFException.class)
+ public void testSeekToLength() throws IOException {
+ repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+ ONEMB);
+ OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 100);
+ try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
+ keyInfo, true, null, null, streamFactory)) {
+ ecb.seek(100);
+ }
+ }
+
+ @Test
+ public void testSeekToLengthZeroLengthBlock() throws IOException {
+ repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+ ONEMB);
+ OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 0);
+ try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
+ keyInfo, true, null, null, streamFactory)) {
+ ecb.seek(0);
+ Assert.assertEquals(0, ecb.getPos());
+ Assert.assertEquals(0, ecb.getRemaining());
+ }
+ }
+
+ @Test
+ public void testSeekToValidPosition() throws IOException {
+ repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+ ONEMB);
+ OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+ try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
+ keyInfo, true, null, null, streamFactory)) {
+ ecb.seek(ONEMB - 1);
+ Assert.assertEquals(ONEMB - 1, ecb.getPos());
+ Assert.assertEquals(ONEMB * 4 + 1, ecb.getRemaining());
+ // First read should read the last byte of the first chunk
+ Assert.assertEquals(0, ecb.read());
+ Assert.assertEquals(ONEMB,
+ streamFactory.getBlockStreams().get(0).position);
+ // Second read should be the first byte of the second chunk.
+ Assert.assertEquals(1, ecb.read());
+
+ // Seek to the end of the file minus one byte
+ ecb.seek(ONEMB * 5 - 1);
+ Assert.assertEquals(1, ecb.read());
+ Assert.assertEquals(ONEMB * 2,
+ streamFactory.getBlockStreams().get(1).position);
+ // Second read should be EOF as there is no data left
+ Assert.assertEquals(-1, ecb.read());
+ Assert.assertEquals(0, ecb.getRemaining());
+ }
+ }
+
private void validateBufferContents(ByteBuffer buf, int from, int to,
byte val) {
for (int i=from; i<to; i++){
@@ -412,8 +476,13 @@ public class TestECBlockInputStream {
}
@Override
+ public void seek(long pos) {
+ this.position = pos;
+ }
+
+ @Override
public long getPos() {
- return 0;
+ return position;
}
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
index 7a23715..dfd2a64 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
@@ -23,7 +23,8 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.StorageUnit;
-import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -48,6 +49,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
@@ -113,7 +115,7 @@ public abstract class TestInputStreamBase {
conf.setFromObject(repConf);
cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(4)
+ .setNumDatanodes(5)
.setTotalPipelineNumLimit(5)
.setBlockSize(BLOCK_SIZE)
.setChunkSize(CHUNK_SIZE)
@@ -168,8 +170,14 @@ public abstract class TestInputStreamBase {
}
byte[] writeKey(String keyName, int dataLength) throws Exception {
+ ReplicationConfig repConfig = new RatisReplicationConfig(THREE);
+ return writeKey(keyName, repConfig, dataLength);
+ }
+
+ byte[] writeKey(String keyName, ReplicationConfig repConfig, int dataLength)
+ throws Exception {
OzoneOutputStream key = TestHelper.createKey(keyName,
- ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
+ repConfig, 0, objectStore, volumeName, bucketName);
byte[] inputData = ContainerTestHelper.getFixedLengthString(
keyString, dataLength).getBytes(UTF_8);
@@ -181,8 +189,15 @@ public abstract class TestInputStreamBase {
byte[] writeRandomBytes(String keyName, int dataLength)
throws Exception {
+ ReplicationConfig repConfig = new RatisReplicationConfig(THREE);
+ return writeRandomBytes(keyName, repConfig, dataLength);
+ }
+
+ byte[] writeRandomBytes(String keyName, ReplicationConfig repConfig,
+ int dataLength)
+ throws Exception {
OzoneOutputStream key = TestHelper.createKey(keyName,
- ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
+ repConfig, 0, objectStore, volumeName, bucketName);
byte[] inputData = new byte[dataLength];
RAND.nextBytes(inputData);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStream.java
index 89e78e9..4946fee 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStream.java
@@ -23,8 +23,10 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
+import java.util.Random;
import java.util.concurrent.TimeoutException;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -51,6 +53,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.hdds.client.ECReplicationConfig.EcCodec.RS;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
import static org.apache.hadoop.ozone.container.TestHelper.countReplicas;
import static org.junit.Assert.fail;
@@ -96,6 +99,24 @@ public class TestKeyInputStream extends TestInputStreamBase {
}
/**
+ * This method does random seeks and reads and validates the reads are
+ * correct or not.
+ * @param dataLength
+ * @param keyInputStream
+ * @param inputData
+ * @param readSize
+ * @throws Exception
+ */
+ private void randomPositionSeek(int dataLength, KeyInputStream
keyInputStream,
+ byte[] inputData, int readSize) throws Exception {
+ Random rand = new Random();
+ for (int i = 0; i < 100; i++) {
+ int position = rand.nextInt(dataLength - readSize);
+ validate(keyInputStream, inputData, position, readSize);
+ }
+ }
+
+ /**
* This method seeks to specified seek value and read the data specified by
* readLength and validate the read is correct or not.
* @param keyInputStream
@@ -126,6 +147,7 @@ public class TestKeyInputStream extends TestInputStreamBase
{
testReadChunkWithByteArray();
testReadChunkWithByteBuffer();
testSkip();
+ testECSeek();
}
public void testInputStreams() throws Exception {
@@ -198,6 +220,36 @@ public class TestKeyInputStream extends
TestInputStreamBase {
keyInputStream.close();
}
+ public void testECSeek() throws Exception {
+ int ecChunkSize = 1024 * 1024;
+ ECReplicationConfig repConfig = new ECReplicationConfig(3, 2, RS,
+ ecChunkSize);
+ String keyName = getNewKeyName();
+ // 3 full EC blocks plus one chunk
+ int dataLength = (9 * BLOCK_SIZE + ecChunkSize);
+
+ byte[] inputData = writeRandomBytes(keyName, repConfig, dataLength);
+ try(KeyInputStream keyInputStream = getKeyInputStream(keyName)) {
+
+ validate(keyInputStream, inputData, 0, ecChunkSize + 1234);
+
+ validate(keyInputStream, inputData, 200, ecChunkSize);
+
+ validate(keyInputStream, inputData, BLOCK_SIZE * 4, ecChunkSize);
+
+ validate(keyInputStream, inputData, BLOCK_SIZE * 4 + 200, ecChunkSize);
+
+ validate(keyInputStream, inputData, dataLength - ecChunkSize - 100,
+ ecChunkSize + 50);
+
+ randomPositionSeek(dataLength, keyInputStream, inputData,
+ ecChunkSize + 200);
+
+ // Read entire key.
+ validate(keyInputStream, inputData, 0, dataLength);
+ }
+ }
+
public void testSeek() throws Exception {
XceiverClientManager.resetXceiverClientMetrics();
XceiverClientMetrics metrics = XceiverClientManager
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]