This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new 6b8d935  HDDS-6319. EC: Fix read big file failure with EC policy 10+4. 
(#3094)
6b8d935 is described below

commit 6b8d935f7340492bb2a3f9bb86482665925052d6
Author: Gui Hecheng <[email protected]>
AuthorDate: Thu Feb 24 01:41:03 2022 +0800

    HDDS-6319. EC: Fix read big file failure with EC policy 10+4. (#3094)
---
 .../hadoop/ozone/client/io/KeyInputStream.java     |   7 +-
 .../ozone/client/rpc/read/ECStreamTestUtil.java    |   4 +-
 .../client/rpc/read/TestKeyInputStreamEC.java      | 118 +++++++++++++++++++++
 3 files changed, 126 insertions(+), 3 deletions(-)

diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
index 46a7b50..20cb2b5 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
@@ -210,6 +210,11 @@ public class KeyInputStream extends ExtendedInputStream {
     blockStreams.add(blockInputStream);
   }
 
+  @VisibleForTesting
+  public void addStream(BlockExtendedInputStream blockInputStream) {
+    blockStreams.add(blockInputStream);
+  }
+
   /**
    * {@inheritDoc}
    */
@@ -264,7 +269,7 @@ public class KeyInputStream extends ExtendedInputStream {
 
       // Get the current blockStream and read data from it
       BlockExtendedInputStream current = blockStreams.get(blockIndex);
-      int numBytesToRead = Math.min(buffLen, (int)current.getRemaining());
+      int numBytesToRead = (int)Math.min(buffLen, current.getRemaining());
       int numBytesRead = strategy.readFromBlock(current, numBytesToRead);
       if (numBytesRead != numBytesToRead) {
         // This implies that there is either data loss or corruption in the
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
index b770d42..b8b9977 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
@@ -327,7 +327,7 @@ public final class ECStreamTestUtil {
 
     @Override
     public long getRemaining() {
-      return data.remaining();
+      return getLength() - getPos();
     }
 
     @Override
@@ -344,7 +344,7 @@ public final class ECStreamTestUtil {
       if (getRemaining() == 0) {
         return EOF;
       }
-      int toRead = Math.min(buf.remaining(), (int)getRemaining());
+      int toRead = (int)Math.min(buf.remaining(), getRemaining());
       for (int i = 0; i < toRead; i++) {
         if (shouldError && data.position() >= shouldErrorPosition) {
           throwError();
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStreamEC.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStreamEC.java
new file mode 100644
index 0000000..fa6688f
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStreamEC.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.KeyInputStream;
+import org.apache.hadoop.ozone.client.io.LengthInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.ozone.OzoneConsts.MB;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test KeyInputStream with EC keys.
+ */
+public class TestKeyInputStreamEC {
+
+  @Test
+  public void testReadAgainstLargeBlockGroup() throws IOException {
+    int dataBlocks = 10;
+    int parityBlocks = 4;
+    ECReplicationConfig ec10And4RepConfig = new ECReplicationConfig(dataBlocks,
+        parityBlocks, ECReplicationConfig.EcCodec.RS, (int)(1 * MB));
+    // default blockSize of 256MB with EC 10+4 makes a large block group
+    long blockSize = 256 * MB;
+    long blockLength = dataBlocks * blockSize;
+    OmKeyInfo keyInfo = createOmKeyInfo(ec10And4RepConfig,
+        dataBlocks + parityBlocks, blockLength);
+
+    BlockExtendedInputStream blockInputStream =
+        new ECStreamTestUtil.TestBlockInputStream(new BlockID(1, 1),
+        blockLength, ByteBuffer.allocate(100));
+
+    BlockInputStreamFactory mockStreamFactory =
+        mock(BlockInputStreamFactory.class);
+    when(mockStreamFactory.create(any(), any(), any(), any(),
+        anyBoolean(), any(), any())).thenReturn(blockInputStream);
+
+    try (LengthInputStream kis = KeyInputStream.getFromOmKeyInfo(keyInfo,
+        null, true,  null, mockStreamFactory)) {
+      byte[] buf = new byte[100];
+      int readBytes = kis.read(buf, 0, 100);
+      Assert.assertEquals(100, readBytes);
+    }
+  }
+
+  private OmKeyInfo createOmKeyInfo(ReplicationConfig repConf,
+      int nodeCount, long blockLength) {
+    Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+    for (int i = 0; i < nodeCount; i++) {
+      dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), i + 1);
+    }
+
+    Pipeline pipeline = Pipeline.newBuilder()
+        .setState(Pipeline.PipelineState.CLOSED)
+        .setId(PipelineID.randomId())
+        .setNodes(new ArrayList<>(dnMap.keySet()))
+        .setReplicaIndexes(dnMap)
+        .setReplicationConfig(repConf)
+        .build();
+
+    OmKeyLocationInfo blockInfo = new OmKeyLocationInfo.Builder()
+        .setBlockID(new BlockID(1, 1))
+        .setLength(blockLength)
+        .setOffset(0)
+        .setPipeline(pipeline)
+        .setPartNumber(0)
+        .build();
+
+    List<OmKeyLocationInfo> locations = new ArrayList<>();
+    locations.add(blockInfo);
+    return new OmKeyInfo.Builder()
+        .setBucketName("bucket")
+        .setVolumeName("volume")
+        .setDataSize(blockLength)
+        .setKeyName("someKey")
+        .setReplicationConfig(repConf)
+        .addOmKeyLocationInfoGroup(new OmKeyLocationInfoGroup(0, locations))
+        .build();
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to