This is an automated email from the ASF dual-hosted git repository.

surendralilhore pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 382967b  HDFS-14373. EC : Decoding is failing when block group last 
incomplete cell fall in to AlignedStripe. Contributed by Surendra Singh Lilhore.
382967b is described below

commit 382967be51052d59e31d8d05713645b8d3c2325b
Author: Surendra Singh Lilhore <surendralilh...@apache.org>
AuthorDate: Tue Oct 8 00:14:30 2019 +0530

    HDFS-14373. EC : Decoding is failing when block group last incomplete cell 
fall in to AlignedStripe. Contributed by Surendra Singh Lilhore.
---
 .../java/org/apache/hadoop/hdfs/StripeReader.java  |  4 ++
 .../apache/hadoop/hdfs/util/StripedBlockUtil.java  | 20 +++++++--
 .../hadoop/hdfs/TestDFSStripedInputStream.java     | 47 ++++++++++++++++++++++
 3 files changed, 68 insertions(+), 3 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
index e90af84..8fd38bd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
@@ -248,6 +248,8 @@ abstract class StripeReader {
       DFSClient.LOG.warn("Found Checksum error for "
           + currentBlock + " from " + currentNode
           + " at " + ce.getPos());
+      //Clear buffer to make next decode success
+      strategy.getReadBuffer().clear();
       // we want to remember which block replicas we have tried
       corruptedBlocks.addCorruptedBlock(currentBlock, currentNode);
       throw ce;
@@ -255,6 +257,8 @@ abstract class StripeReader {
       DFSClient.LOG.warn("Exception while reading from "
           + currentBlock + " of " + dfsStripedInputStream.getSrc() + " from "
           + currentNode, e);
+      //Clear buffer to make next decode success
+      strategy.getReadBuffer().clear();
       throw e;
     }
   }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index 7251c7b..012d708 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -356,7 +356,8 @@ public class StripedBlockUtil {
         cells);
 
     // Step 3: merge into stripes
-    AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges);
+    AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges,
+        blockGroup, cellSize);
 
     // Step 4: calculate each chunk's position in destination buffer. Since the
     // whole read range is within a single stripe, the logic is simpler here.
@@ -417,7 +418,8 @@ public class StripedBlockUtil {
         cells);
 
     // Step 3: merge into at most 5 stripes
-    AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges);
+    AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges,
+        blockGroup, cellSize);
 
     // Step 4: calculate each chunk's position in destination buffer
     calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf);
@@ -513,7 +515,8 @@ public class StripedBlockUtil {
    * {@link AlignedStripe} instances.
    */
   private static AlignedStripe[] mergeRangesForInternalBlocks(
-      ErasureCodingPolicy ecPolicy, VerticalRange[] ranges) {
+      ErasureCodingPolicy ecPolicy, VerticalRange[] ranges,
+      LocatedStripedBlock blockGroup, int cellSize) {
     int dataBlkNum = ecPolicy.getNumDataUnits();
     int parityBlkNum = ecPolicy.getNumParityUnits();
     List<AlignedStripe> stripes = new ArrayList<>();
@@ -525,6 +528,17 @@ public class StripedBlockUtil {
       }
     }
 
+    // Add block group last cell offset in stripePoints if it is fall in to 
read
+    // offset range.
+    int lastCellIdxInBG = (int) (blockGroup.getBlockSize() / cellSize);
+    int idxInInternalBlk = lastCellIdxInBG / ecPolicy.getNumDataUnits();
+    long lastCellEndOffset = (idxInInternalBlk * (long)cellSize)
+        + (blockGroup.getBlockSize() % cellSize);
+    if (stripePoints.first() < lastCellEndOffset
+        && stripePoints.last() > lastCellEndOffset) {
+      stripePoints.add(lastCellEndOffset);
+    }
+
     long prev = -1;
     for (long point : stripePoints) {
       if (prev >= 0) {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
index 604a8c6..8c2367d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -19,8 +19,11 @@ package org.apache.hadoop.hdfs;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -561,6 +564,50 @@ public class TestDFSStripedInputStream {
     }
   }
 
+  @Test
+  public void testReadWhenLastIncompleteCellComeInToDecodeAlignedStripe()
+      throws IOException {
+    DataNodeProperties stopDataNode = null;
+    try {
+      cluster.waitActive();
+      ErasureCodingPolicy policy = getEcPolicy();
+      DistributedFileSystem filesystem = cluster.getFileSystem();
+      filesystem.enableErasureCodingPolicy(policy.getName());
+      Path dir = new Path("/tmp");
+      filesystem.mkdirs(dir);
+      filesystem.getClient().setErasureCodingPolicy(dir.toString(),
+          policy.getName());
+      Path f = new Path(dir, "file");
+
+      //1. File with one stripe, last data cell should be half filed.
+      long fileLength = (policy.getCellSize() * policy.getNumDataUnits())
+          - (policy.getCellSize() / 2);
+      DFSTestUtil.createFile(filesystem, f, fileLength, (short) 1, 0);
+
+      //2. Stop first DN from stripe.
+      LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations(
+          f.toString(), 0, fileLength);
+      LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
+      final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(bg,
+          cellSize, dataBlocks, parityBlocks);
+      cluster.stopDataNode(blocks[0].getLocations()[0].getName());
+
+      //3. Do pread for fist cell, reconstruction should happen
+      try (FSDataInputStream in = filesystem.open(f)) {
+        DFSStripedInputStream stripedIn = (DFSStripedInputStream) in
+            .getWrappedStream();
+        byte[] b = new byte[policy.getCellSize()];
+        stripedIn.read(0, b, 0, policy.getCellSize());
+      }
+    } catch (HadoopIllegalArgumentException e) {
+      fail(e.getMessage());
+    } finally {
+      if (stopDataNode != null) {
+        cluster.restartDataNode(stopDataNode, true);
+      }
+    }
+  }
+
   /**
    * Empties the pool for the specified buffer type, for the current ecPolicy.
    * <p>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to