This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 81b05977f259 HDFS-17455. Fix Client throw IndexOutOfBoundsException in 
DFSInputStream#fetchBlockAt (#6710). Contributed by Haiyang Hu.
81b05977f259 is described below

commit 81b05977f2592f17e65242637f4366e51961c778
Author: huhaiyang <huhaiyang...@126.com>
AuthorDate: Thu Apr 11 18:04:57 2024 +0800

    HDFS-17455. Fix Client throw IndexOutOfBoundsException in 
DFSInputStream#fetchBlockAt (#6710). Contributed by Haiyang Hu.
    
    Reviewed-by: ZanderXu <zande...@apache.org>
    Signed-off-by: He Xiaoqiao <hexiaoq...@apache.org>
---
 .../apache/hadoop/hdfs/DFSClientFaultInjector.java |  2 +
 .../org/apache/hadoop/hdfs/DFSInputStream.java     |  9 +++
 .../org/apache/hadoop/hdfs/TestDFSInputStream.java | 72 ++++++++++++++++++++++
 3 files changed, 83 insertions(+)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
index caf8aad32e34..8150b6fb0e20 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.classification.VisibleForTesting;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 
 /**
  * Used for injecting faults in DFSClient and DFSOutputStream tests.
@@ -69,4 +70,5 @@ public class DFSClientFaultInjector {
 
   public void onCreateBlockReader(LocatedBlock block, int chunkIndex, long 
offset, long length) {}
 
+  public void failCreateBlockReader() throws InvalidBlockTokenException {}
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 1ec55e4f8206..b54694fab110 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -519,6 +519,14 @@ public class DFSInputStream extends FSInputStream
         // Update the LastLocatedBlock, if offset is for last block.
         if (offset >= locatedBlocks.getFileLength()) {
           setLocatedBlocksFields(newBlocks, getLastBlockLength(newBlocks));
+          // After updating the locatedBlock, the block to which the offset 
belongs
+          // should be researched like {@link DFSInputStream#getBlockAt(long)}.
+          if (offset >= locatedBlocks.getFileLength()) {
+            return locatedBlocks.getLastLocatedBlock();
+          } else {
+            targetBlockIdx = locatedBlocks.findBlock(offset);
+            assert targetBlockIdx >= 0 && targetBlockIdx < 
locatedBlocks.locatedBlockCount();
+          }
         } else {
           locatedBlocks.insertRange(targetBlockIdx,
               newBlocks.getLocatedBlocks());
@@ -641,6 +649,7 @@ public class DFSInputStream extends FSInputStream
       targetBlock = retval.block;
 
       try {
+        DFSClientFaultInjector.get().failCreateBlockReader();
         blockReader = getBlockReader(targetBlock, offsetIntoBlock,
             targetBlock.getBlockSize() - offsetIntoBlock, targetAddr,
             storageType, chosenNode);
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java
index 2f9e0d319cdb..b2b18237a752 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java
@@ -31,6 +31,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -41,14 +43,21 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
 import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Retry;
 
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
 import org.junit.Assume;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestDFSInputStream {
   private void testSkipInner(MiniDFSCluster cluster) throws IOException {
@@ -287,4 +296,67 @@ public class TestDFSInputStream {
       cluster.shutdown();
     }
   }
+
+  @Test
+  public void testCreateBlockReaderWhenInvalidBlockTokenException() throws
+      IOException, InterruptedException, TimeoutException {
+    GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG);
+    Configuration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 64 * 1024);
+    conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 516);
+    DFSClientFaultInjector oldFaultInjector = DFSClientFaultInjector.get();
+    FSDataOutputStream out = null;
+    try (MiniDFSCluster cluster = new 
MiniDFSCluster.Builder(conf).numDataNodes(3).build()) {
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+
+      // Create file which only contains one UC block.
+      String file = "/testfile";
+      Path path = new Path(file);
+      out = fs.create(path, (short) 3);
+      int bufferLen = 5120;
+      byte[] toWrite = new byte[bufferLen];
+      Random rb = new Random(0);
+      rb.nextBytes(toWrite);
+      out.write(toWrite, 0, bufferLen);
+
+      // Wait for the block length of the file to be 1.
+      GenericTestUtils.waitFor(() -> {
+        try {
+          return fs.getFileBlockLocations(path, 0, bufferLen).length == 1;
+        } catch (IOException e) {
+          return false;
+        }
+      }, 100, 10000);
+
+      // Set up the InjectionHandler.
+      DFSClientFaultInjector.set(Mockito.mock(DFSClientFaultInjector.class));
+      DFSClientFaultInjector injector = DFSClientFaultInjector.get();
+      final AtomicInteger count = new AtomicInteger(0);
+      Mockito.doAnswer(new Answer<Void>() {
+        @Override
+        public Void answer(InvocationOnMock invocation) throws Throwable {
+          // Mock access token was invalid when connecting to first datanode
+          // throw InvalidBlockTokenException.
+          if (count.getAndIncrement() == 0) {
+            throw new InvalidBlockTokenException("Mock 
InvalidBlockTokenException");
+          }
+          return null;
+        }
+      }).when(injector).failCreateBlockReader();
+
+      try (DFSInputStream in = new DFSInputStream(fs.getClient(), file,
+          false, null)) {
+        int bufLen = 1024;
+        byte[] buf = new byte[bufLen];
+        // Seek the offset to 1024 and which should be in the range (0, 
fileSize).
+        in.seek(1024);
+        int read = in.read(buf, 0, bufLen);
+        assertEquals(1024, read);
+      }
+    } finally {
+      DFSClientFaultInjector.set(oldFaultInjector);
+      IOUtils.closeStream(out);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to