This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new 6b8d935 HDDS-6319. EC: Fix read big file failure with EC policy 10+4.
(#3094)
6b8d935 is described below
commit 6b8d935f7340492bb2a3f9bb86482665925052d6
Author: Gui Hecheng <[email protected]>
AuthorDate: Thu Feb 24 01:41:03 2022 +0800
HDDS-6319. EC: Fix read big file failure with EC policy 10+4. (#3094)
---
.../hadoop/ozone/client/io/KeyInputStream.java | 7 +-
.../ozone/client/rpc/read/ECStreamTestUtil.java | 4 +-
.../client/rpc/read/TestKeyInputStreamEC.java | 118 +++++++++++++++++++++
3 files changed, 126 insertions(+), 3 deletions(-)
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
index 46a7b50..20cb2b5 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
@@ -210,6 +210,11 @@ public class KeyInputStream extends ExtendedInputStream {
blockStreams.add(blockInputStream);
}
+ @VisibleForTesting
+ public void addStream(BlockExtendedInputStream blockInputStream) {
+ blockStreams.add(blockInputStream);
+ }
+
/**
* {@inheritDoc}
*/
@@ -264,7 +269,7 @@ public class KeyInputStream extends ExtendedInputStream {
// Get the current blockStream and read data from it
BlockExtendedInputStream current = blockStreams.get(blockIndex);
- int numBytesToRead = Math.min(buffLen, (int)current.getRemaining());
+ int numBytesToRead = (int)Math.min(buffLen, current.getRemaining());
int numBytesRead = strategy.readFromBlock(current, numBytesToRead);
if (numBytesRead != numBytesToRead) {
// This implies that there is either data loss or corruption in the
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
index b770d42..b8b9977 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
@@ -327,7 +327,7 @@ public final class ECStreamTestUtil {
@Override
public long getRemaining() {
- return data.remaining();
+ return getLength() - getPos();
}
@Override
@@ -344,7 +344,7 @@ public final class ECStreamTestUtil {
if (getRemaining() == 0) {
return EOF;
}
- int toRead = Math.min(buf.remaining(), (int)getRemaining());
+ int toRead = (int)Math.min(buf.remaining(), getRemaining());
for (int i = 0; i < toRead; i++) {
if (shouldError && data.position() >= shouldErrorPosition) {
throwError();
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStreamEC.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStreamEC.java
new file mode 100644
index 0000000..fa6688f
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStreamEC.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.KeyInputStream;
+import org.apache.hadoop.ozone.client.io.LengthInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.ozone.OzoneConsts.MB;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test KeyInputStream with EC keys.
+ */
+public class TestKeyInputStreamEC {
+
+ @Test
+ public void testReadAgainstLargeBlockGroup() throws IOException {
+ int dataBlocks = 10;
+ int parityBlocks = 4;
+ ECReplicationConfig ec10And4RepConfig = new ECReplicationConfig(dataBlocks,
+ parityBlocks, ECReplicationConfig.EcCodec.RS, (int)(1 * MB));
+ // default blockSize of 256MB with EC 10+4 makes a large block group
+ long blockSize = 256 * MB;
+ long blockLength = dataBlocks * blockSize;
+ OmKeyInfo keyInfo = createOmKeyInfo(ec10And4RepConfig,
+ dataBlocks + parityBlocks, blockLength);
+
+ BlockExtendedInputStream blockInputStream =
+ new ECStreamTestUtil.TestBlockInputStream(new BlockID(1, 1),
+ blockLength, ByteBuffer.allocate(100));
+
+ BlockInputStreamFactory mockStreamFactory =
+ mock(BlockInputStreamFactory.class);
+ when(mockStreamFactory.create(any(), any(), any(), any(),
+ anyBoolean(), any(), any())).thenReturn(blockInputStream);
+
+ try (LengthInputStream kis = KeyInputStream.getFromOmKeyInfo(keyInfo,
+ null, true, null, mockStreamFactory)) {
+ byte[] buf = new byte[100];
+ int readBytes = kis.read(buf, 0, 100);
+ Assert.assertEquals(100, readBytes);
+ }
+ }
+
+ private OmKeyInfo createOmKeyInfo(ReplicationConfig repConf,
+ int nodeCount, long blockLength) {
+ Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+ for (int i = 0; i < nodeCount; i++) {
+ dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), i + 1);
+ }
+
+ Pipeline pipeline = Pipeline.newBuilder()
+ .setState(Pipeline.PipelineState.CLOSED)
+ .setId(PipelineID.randomId())
+ .setNodes(new ArrayList<>(dnMap.keySet()))
+ .setReplicaIndexes(dnMap)
+ .setReplicationConfig(repConf)
+ .build();
+
+ OmKeyLocationInfo blockInfo = new OmKeyLocationInfo.Builder()
+ .setBlockID(new BlockID(1, 1))
+ .setLength(blockLength)
+ .setOffset(0)
+ .setPipeline(pipeline)
+ .setPartNumber(0)
+ .build();
+
+ List<OmKeyLocationInfo> locations = new ArrayList<>();
+ locations.add(blockInfo);
+ return new OmKeyInfo.Builder()
+ .setBucketName("bucket")
+ .setVolumeName("volume")
+ .setDataSize(blockLength)
+ .setKeyName("someKey")
+ .setReplicationConfig(repConf)
+ .addOmKeyLocationInfoGroup(new OmKeyLocationInfoGroup(0, locations))
+ .build();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]