ZanderXu commented on code in PR #6710:
URL: https://github.com/apache/hadoop/pull/6710#discussion_r1556772812


##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java:
##########
@@ -287,4 +300,69 @@ public void testReadWithoutPreferredCachingReplica() 
throws IOException {
       cluster.shutdown();
     }
   }
+
+  @Test
+  public void testCreateBlockReaderWhenInvalidBlockTokenException() throws
+      IOException, InterruptedException, TimeoutException {
+    GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG);
+    Configuration conf = new Configuration();
+    DFSClientFaultInjector oldFaultInjector = DFSClientFaultInjector.get();
+    try (MiniDFSCluster cluster = new 
MiniDFSCluster.Builder(conf).numDataNodes(3).build()) {
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      String file = "/testfile";
+      Path path = new Path(file);
+      long fileLen = 1024 * 64;
+      EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
+      FSDataOutputStream out =  fs.create(path, FsPermission.getFileDefault(), 
createFlags,

Review Comment:
   this out should be closed in the `finally` logic, right?
   



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java:
##########
@@ -287,4 +300,69 @@ public void testReadWithoutPreferredCachingReplica() 
throws IOException {
       cluster.shutdown();
     }
   }
+
+  @Test
+  public void testCreateBlockReaderWhenInvalidBlockTokenException() throws
+      IOException, InterruptedException, TimeoutException {
+    GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG);
+    Configuration conf = new Configuration();
+    DFSClientFaultInjector oldFaultInjector = DFSClientFaultInjector.get();
+    try (MiniDFSCluster cluster = new 
MiniDFSCluster.Builder(conf).numDataNodes(3).build()) {
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      String file = "/testfile";
+      Path path = new Path(file);
+      long fileLen = 1024 * 64;
+      EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
+      FSDataOutputStream out =  fs.create(path, FsPermission.getFileDefault(), 
createFlags,
+          fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, 4096), (short) 3,
+          fs.getDefaultBlockSize(path), null);
+      int bufferLen = 1024;
+      byte[] toWrite = new byte[bufferLen];
+      Random rb = new Random(0);
+      long bytesToWrite = fileLen;
+      while (bytesToWrite > 0) {
+        rb.nextBytes(toWrite);
+        int bytesToWriteNext = (bufferLen < bytesToWrite) ? bufferLen : (int) 
bytesToWrite;
+        out.write(toWrite, 0, bytesToWriteNext);

Review Comment:
   Please add some comments to show that you just want to create a file which 
only contains one UC block.



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java:
##########
@@ -287,4 +300,69 @@ public void testReadWithoutPreferredCachingReplica() 
throws IOException {
       cluster.shutdown();
     }
   }
+
+  @Test
+  public void testCreateBlockReaderWhenInvalidBlockTokenException() throws
+      IOException, InterruptedException, TimeoutException {
+    GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG);
+    Configuration conf = new Configuration();
+    DFSClientFaultInjector oldFaultInjector = DFSClientFaultInjector.get();
+    try (MiniDFSCluster cluster = new 
MiniDFSCluster.Builder(conf).numDataNodes(3).build()) {
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      String file = "/testfile";
+      Path path = new Path(file);
+      long fileLen = 1024 * 64;
+      EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
+      FSDataOutputStream out =  fs.create(path, FsPermission.getFileDefault(), 
createFlags,
+          fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, 4096), (short) 3,
+          fs.getDefaultBlockSize(path), null);

Review Comment:
   What's the default block size? 256MB? If so, please hardcode it.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java:
##########
@@ -520,6 +520,15 @@ private LocatedBlock fetchBlockAt(long offset, long 
length, boolean useCache)
         // Update the LastLocatedBlock, if offset is for last block.
         if (offset >= locatedBlocks.getFileLength()) {
           setLocatedBlocksFields(newBlocks, getLastBlockLength(newBlocks));
+          // Here locatedBlocks has been updated, need to check offset again.
+          // If offset to the portion of the last block, will return the last 
block,
+          // otherwise the block containing the specified offset needs to be 
searched again.
+          if (offset >= locatedBlocks.getFileLength()) {

Review Comment:
   Make sense. Please make the comments clearer.
   ```
             /**
              * After updating the locatedBlock, the block to which the offset 
belongs
              * should be researched like {@link 
DFSInputStream#getBlockAt(long)}.
              */
   ```



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java:
##########
@@ -287,4 +300,69 @@ public void testReadWithoutPreferredCachingReplica() 
throws IOException {
       cluster.shutdown();
     }
   }
+
+  @Test
+  public void testCreateBlockReaderWhenInvalidBlockTokenException() throws
+      IOException, InterruptedException, TimeoutException {
+    GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG);
+    Configuration conf = new Configuration();
+    DFSClientFaultInjector oldFaultInjector = DFSClientFaultInjector.get();
+    try (MiniDFSCluster cluster = new 
MiniDFSCluster.Builder(conf).numDataNodes(3).build()) {
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      String file = "/testfile";
+      Path path = new Path(file);
+      long fileLen = 1024 * 64;
+      EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
+      FSDataOutputStream out =  fs.create(path, FsPermission.getFileDefault(), 
createFlags,
+          fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, 4096), (short) 3,
+          fs.getDefaultBlockSize(path), null);
+      int bufferLen = 1024;
+      byte[] toWrite = new byte[bufferLen];
+      Random rb = new Random(0);
+      long bytesToWrite = fileLen;
+      while (bytesToWrite > 0) {
+        rb.nextBytes(toWrite);
+        int bytesToWriteNext = (bufferLen < bytesToWrite) ? bufferLen : (int) 
bytesToWrite;
+        out.write(toWrite, 0, bytesToWriteNext);
+        bytesToWrite -= bytesToWriteNext;
+      }
+
+      GenericTestUtils.waitFor(() -> {
+        try {
+          return fs.getFileBlockLocations(path, 0, fileLen).length == 1;
+        } catch (IOException e) {
+          return false;
+        }
+      }, 100, 10000);
+
+      // Set up the InjectionHandler.
+      DFSClientFaultInjector.set(Mockito.mock(DFSClientFaultInjector.class));
+      DFSClientFaultInjector injector = DFSClientFaultInjector.get();
+      final AtomicInteger count = new AtomicInteger(0);
+      Mockito.doAnswer(new Answer<Void>() {
+        @Override
+        public Void answer(InvocationOnMock invocation) throws Throwable {
+          // Mock access token was invalid when connecting to first datanode
+          // throw InvalidBlockTokenException.
+          if (count.getAndIncrement() == 0) {
+            throw new InvalidBlockTokenException("Mock 
InvalidBlockTokenException");
+          }
+          return null;
+        }
+      }).when(injector).failCreateBlockReader();
+
+      try (DFSInputStream in = new DFSInputStream(fs.getClient(), file,
+          false, null)) {
+        int bufLen = 1024;
+        byte[] buf = new byte[bufLen];
+        //Seek the offset to 1024.
+        in.seek(1024);

Review Comment:
   add some comments to show that the offset should be in (0, fileSize).



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java:
##########
@@ -287,4 +300,69 @@ public void testReadWithoutPreferredCachingReplica() 
throws IOException {
       cluster.shutdown();
     }
   }
+
+  @Test
+  public void testCreateBlockReaderWhenInvalidBlockTokenException() throws
+      IOException, InterruptedException, TimeoutException {
+    GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG);
+    Configuration conf = new Configuration();
+    DFSClientFaultInjector oldFaultInjector = DFSClientFaultInjector.get();
+    try (MiniDFSCluster cluster = new 
MiniDFSCluster.Builder(conf).numDataNodes(3).build()) {
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      String file = "/testfile";
+      Path path = new Path(file);
+      long fileLen = 1024 * 64;
+      EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
+      FSDataOutputStream out =  fs.create(path, FsPermission.getFileDefault(), 
createFlags,
+          fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, 4096), (short) 3,
+          fs.getDefaultBlockSize(path), null);
+      int bufferLen = 1024;
+      byte[] toWrite = new byte[bufferLen];
+      Random rb = new Random(0);
+      long bytesToWrite = fileLen;
+      while (bytesToWrite > 0) {

Review Comment:
   These logic is unnecessary if you just want to write 64KB data. 2KB is 
enough, right? I see you just seek to 1024.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to