This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new c387656530 HDDS-12063. Speed up TestLeaseRecovery (#7688)
c387656530 is described below

commit c3876565308902cbe96fb9b9ce0b8d9030f6e7c9
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Mon Jan 13 15:03:21 2025 +0100

    HDDS-12063. Speed up TestLeaseRecovery (#7688)
---
 .../apache/hadoop/fs/ozone/TestLeaseRecovery.java  | 237 +++++++++++----------
 1 file changed, 130 insertions(+), 107 deletions(-)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java
index d488530033..f178bf2435 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java
@@ -41,14 +41,22 @@ import org.apache.hadoop.ozone.TestDataUtil;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.utils.FaultInjectorImpl;
 import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.OzoneTestBase;
 import org.apache.ozone.test.tag.Flaky;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestMethodOrder;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -58,8 +66,10 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.ConnectException;
+import java.util.LinkedList;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_READ_TIMEOUT;
@@ -75,6 +85,7 @@ import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -83,7 +94,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  */
 @Timeout(300)
 @Flaky("HDDS-11323")
-public class TestLeaseRecovery {
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class TestLeaseRecovery extends OzoneTestBase {
+
+  private static final AtomicInteger FILE_COUNTER = new AtomicInteger();
 
   private MiniOzoneCluster cluster;
   private OzoneBucket bucket;
@@ -92,6 +107,8 @@ public class TestLeaseRecovery {
   private final OzoneConfiguration conf = new OzoneConfiguration();
   private String dir;
   private Path file;
+  private GenericTestUtils.LogCapturer xceiverClientLogs;
+  private RootedOzoneFileSystem fs;
 
   /**
    * Closing the output stream after lease recovery throws because the key
@@ -104,12 +121,15 @@ public class TestLeaseRecovery {
   public static void closeIgnoringOMException(OutputStream stream, 
OMException.ResultCodes expectedResultCode) {
     try {
       stream.close();
-    } catch (IOException e) {
-      assertEquals(expectedResultCode, ((OMException)e).getResult());
+    } catch (OMException e) {
+      assertEquals(expectedResultCode, e.getResult());
+    } catch (Exception e) {
+      OMException omException = assertInstanceOf(OMException.class, 
e.getCause());
+      assertEquals(expectedResultCode, omException.getResult());
     }
   }
 
-  @BeforeEach
+  @BeforeAll
   public void init() throws IOException, InterruptedException,
       TimeoutException {
     final int chunkSize = 16 << 10;
@@ -120,6 +140,7 @@ public class TestLeaseRecovery {
 
     conf.setBoolean(OzoneConfigKeys.OZONE_HBASE_ENHANCEMENTS_ALLOWED, true);
     conf.setBoolean("ozone.client.hbase.enhancements.allowed", true);
+    conf.setBoolean("fs." + OZONE_OFS_URI_SCHEME + ".impl.disable.cache", 
true);
     conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
     conf.set(OZONE_DEFAULT_BUCKET_LAYOUT, layout.name());
     conf.setInt(OZONE_SCM_RATIS_PIPELINE_LIMIT, 10);
@@ -152,10 +173,24 @@ public class TestLeaseRecovery {
     final String rootPath = String.format("%s://%s/", OZONE_OFS_URI_SCHEME, 
conf.get(OZONE_OM_ADDRESS_KEY));
     conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
     dir = OZONE_ROOT + bucket.getVolumeName() + OZONE_URI_DELIMITER + 
bucket.getName();
-    file = new Path(dir, "file");
+
+    xceiverClientLogs = 
GenericTestUtils.LogCapturer.captureLogs(XceiverClientGrpc.getLogger());
+  }
+
+  @BeforeEach
+  void beforeEach() throws Exception {
+    file = new Path(dir, "file-" + getTestName() + "-" + 
FILE_COUNTER.incrementAndGet());
+    fs = (RootedOzoneFileSystem) FileSystem.get(conf);
   }
 
   @AfterEach
+  void afterEach() {
+    IOUtils.closeQuietly(fs);
+    xceiverClientLogs.clearOutput();
+    KeyValueHandler.setInjector(null);
+  }
+
+  @AfterAll
   public void tearDown() {
     IOUtils.closeQuietly(client);
     if (cluster != null) {
@@ -166,8 +201,6 @@ public class TestLeaseRecovery {
   @ParameterizedTest
   @ValueSource(ints = {1 << 17, (1 << 17) + 1, (1 << 17) - 1})
   public void testRecovery(int dataSize) throws Exception {
-    RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
-
     final byte[] data = getData(dataSize);
 
     final FSDataOutputStream stream = fs.create(file, true);
@@ -199,8 +232,6 @@ public class TestLeaseRecovery {
 
   @Test
   public void testRecoveryWithoutHsyncHflushOnLastBlock() throws Exception {
-    RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
-
     int blockSize = (int) 
cluster.getOzoneManager().getConfiguration().getStorageSize(
         OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
 
@@ -235,24 +266,16 @@ public class TestLeaseRecovery {
 
   @Test
   public void testOBSRecoveryShouldFail() throws Exception {
-    // Set the fs.defaultFS
-    bucket = TestDataUtil.createVolumeAndBucket(client,
+    OzoneBucket obsBucket = TestDataUtil.createVolumeAndBucket(client,
         "vol2", "obs", BucketLayout.OBJECT_STORE);
-    final String rootPath = String.format("%s://%s/", OZONE_OFS_URI_SCHEME,
-        conf.get(OZONE_OM_ADDRESS_KEY));
-    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+    String obsDir = OZONE_ROOT + obsBucket.getVolumeName() + 
OZONE_URI_DELIMITER + obsBucket.getName();
+    Path obsFile = new Path(obsDir, "file" + getTestName() + 
FILE_COUNTER.incrementAndGet());
 
-    final String directory = OZONE_ROOT + bucket.getVolumeName() +
-        OZONE_URI_DELIMITER + bucket.getName();
-    final Path f = new Path(directory, "file");
-
-    RootedOzoneFileSystem fs = (RootedOzoneFileSystem) FileSystem.get(conf);
-    assertThrows(IllegalArgumentException.class, () -> fs.recoverLease(f));
+    assertThrows(IllegalArgumentException.class, () -> 
fs.recoverLease(obsFile));
   }
 
   @Test
   public void testFinalizeBlockFailure() throws Exception {
-    RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
     int dataSize = 100;
     final byte[] data = getData(dataSize);
 
@@ -294,7 +317,6 @@ public class TestLeaseRecovery {
 
   @Test
   public void testBlockPipelineClosed() throws Exception {
-    RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
     int dataSize = 100;
     final byte[] data = getData(dataSize);
 
@@ -310,8 +332,7 @@ public class TestLeaseRecovery {
 
       // close the pipeline
       StorageContainerManager scm = cluster.getStorageContainerManager();
-      ContainerInfo container = 
scm.getContainerManager().getContainers().get(0);
-      OzoneTestUtils.closeContainer(scm, container);
+      ContainerInfo container = closeLatestContainer();
       GenericTestUtils.waitFor(() -> {
         try {
           return 
scm.getPipelineManager().getPipeline(container.getPipelineID()).isClosed();
@@ -338,62 +359,59 @@ public class TestLeaseRecovery {
   @ValueSource(booleans = {false, true})
   public void testGetCommittedBlockLengthTimeout(boolean forceRecovery) throws 
Exception {
     // reduce read timeout
-    conf.set(OZONE_CLIENT_READ_TIMEOUT, "2s");
+    OzoneConfiguration clientConf = new OzoneConfiguration(conf);
+    clientConf.set(OZONE_CLIENT_READ_TIMEOUT, "2s");
     // set force recovery
     System.setProperty(FORCE_LEASE_RECOVERY_ENV, 
String.valueOf(forceRecovery));
-    RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
-    int dataSize = 100;
-    final byte[] data = getData(dataSize);
+    try (RootedOzoneFileSystem fs = 
(RootedOzoneFileSystem)FileSystem.get(clientConf)) {
+      int dataSize = 100;
+      final byte[] data = getData(dataSize);
 
-    final FSDataOutputStream stream = fs.create(file, true);
-    try {
-      stream.write(data);
-      stream.hsync();
-      assertFalse(fs.isFileClosed(file));
-
-      // write more data without hsync
-      stream.write(data);
-      stream.flush();
-
-      // close the pipeline and container
-      ContainerInfo container = 
cluster.getStorageContainerManager().getContainerManager().getContainers().get(0);
-      OzoneTestUtils.closeContainer(cluster.getStorageContainerManager(), 
container);
-      // pause getCommittedBlockLength handling on all DNs to make sure all 
getCommittedBlockLength will time out
-      FaultInjectorImpl injector = new FaultInjectorImpl();
-      injector.setType(ContainerProtos.Type.GetCommittedBlockLength);
-      KeyValueHandler.setInjector(injector);
-      GenericTestUtils.LogCapturer logs =
-          
GenericTestUtils.LogCapturer.captureLogs(XceiverClientGrpc.getLogger());
-      if (!forceRecovery) {
-        assertThrows(IOException.class, () -> fs.recoverLease(file));
-        return;
-      } else {
-        fs.recoverLease(file);
+      final FSDataOutputStream stream = fs.create(file, true);
+      try {
+        stream.write(data);
+        stream.hsync();
+        assertFalse(fs.isFileClosed(file));
+
+        // write more data without hsync
+        stream.write(data);
+        stream.flush();
+
+        // close the pipeline and container
+        closeLatestContainer();
+        // pause getCommittedBlockLength handling on all DNs to make sure all 
getCommittedBlockLength will time out
+        FaultInjectorImpl injector = new FaultInjectorImpl();
+        injector.setType(ContainerProtos.Type.GetCommittedBlockLength);
+        KeyValueHandler.setInjector(injector);
+        if (!forceRecovery) {
+          assertThrows(IOException.class, () -> fs.recoverLease(file));
+          return;
+        } else {
+          fs.recoverLease(file);
+        }
+        assertEquals(3, StringUtils.countMatches(xceiverClientLogs.getOutput(),
+            "Executing command cmdType: GetCommittedBlockLength"));
+
+        // The lease should have been recovered.
+        assertTrue(fs.isFileClosed(file), "File should be closed");
+        FileStatus fileStatus = fs.getFileStatus(file);
+        // Since all DNs are out, then the length in OM keyInfo will be used 
as the final file length
+        assertEquals(dataSize, fileStatus.getLen());
+      } finally {
+        if (!forceRecovery) {
+          closeIgnoringOMException(stream, 
OMException.ResultCodes.KEY_UNDER_LEASE_RECOVERY);
+        } else {
+          closeIgnoringKeyNotFound(stream);
+        }
       }
-      assertEquals(3, StringUtils.countMatches(logs.getOutput(),
-          "Executing command cmdType: GetCommittedBlockLength"));
 
-      // The lease should have been recovered.
-      assertTrue(fs.isFileClosed(file), "File should be closed");
-      FileStatus fileStatus = fs.getFileStatus(file);
-      // Since all DNs are out, then the length in OM keyInfo will be used as 
the final file length
-      assertEquals(dataSize, fileStatus.getLen());
-    } finally {
-      if (!forceRecovery) {
-        closeIgnoringOMException(stream, 
OMException.ResultCodes.KEY_UNDER_LEASE_RECOVERY);
-      } else {
-        closeIgnoringKeyNotFound(stream);
-      }
-      KeyValueHandler.setInjector(null);
+      // open it again, make sure the data is correct
+      verifyData(data, dataSize, file, fs);
     }
-
-    // open it again, make sure the data is correct
-    verifyData(data, dataSize, file, fs);
   }
 
   @Test
   public void testGetCommittedBlockLengthWithException() throws Exception {
-    RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
     int dataSize = 100;
     final byte[] data = getData(dataSize);
 
@@ -408,8 +426,7 @@ public class TestLeaseRecovery {
       stream.flush();
 
       // close the pipeline and container
-      ContainerInfo container = 
cluster.getStorageContainerManager().getContainerManager().getContainers().get(0);
-      OzoneTestUtils.closeContainer(cluster.getStorageContainerManager(), 
container);
+      ContainerInfo container = closeLatestContainer();
       // throw exception on first DN getCommittedBlockLength handling
       FaultInjectorImpl injector = new FaultInjectorImpl();
       KeyValueHandler.setInjector(injector);
@@ -418,14 +435,13 @@ public class TestLeaseRecovery {
           ContainerProtos.Result.CONTAINER_NOT_FOUND);
       injector.setException(sce);
 
-      GenericTestUtils.LogCapturer logs =
-          
GenericTestUtils.LogCapturer.captureLogs(XceiverClientGrpc.getLogger());
       fs.recoverLease(file);
 
-      assertEquals(2, StringUtils.countMatches(logs.getOutput(),
-          "Executing command cmdType: GetCommittedBlockLength"));
-      assertEquals(1, StringUtils.countMatches(logs.getOutput(),
-          "Failed to execute command cmdType: GetCommittedBlockLength"));
+      String output = xceiverClientLogs.getOutput();
+      assertEquals(2, StringUtils.countMatches(output,
+          "Executing command cmdType: GetCommittedBlockLength"), output);
+      assertEquals(1, StringUtils.countMatches(output,
+          "Failed to execute command cmdType: GetCommittedBlockLength"), 
output);
 
       // The lease should have been recovered.
       assertTrue(fs.isFileClosed(file), "File should be closed");
@@ -433,7 +449,6 @@ public class TestLeaseRecovery {
       assertEquals(dataSize * 2, fileStatus.getLen());
     } finally {
       closeIgnoringKeyNotFound(stream);
-      KeyValueHandler.setInjector(null);
     }
 
     // open it again, make sure the data is correct
@@ -441,29 +456,36 @@ public class TestLeaseRecovery {
   }
 
   @Test
+  @Order(Integer.MAX_VALUE)
   public void testOMConnectionFailure() throws Exception {
     // reduce hadoop RPC retry max attempts
-    conf.setInt(OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 5);
-    conf.setLong(OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY, 100);
-    RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
-    int dataSize = 100;
-    final byte[] data = getData(dataSize);
-
-    final FSDataOutputStream stream = fs.create(file, true);
-    try {
-      stream.write(data);
-      stream.hsync();
-      assertFalse(fs.isFileClosed(file));
-
-      // close OM
-      cluster.getOzoneManager().stop();
-      assertThrows(ConnectException.class, () -> fs.recoverLease(file));
-    } finally {
+    OzoneConfiguration clientConf = new OzoneConfiguration(conf);
+    clientConf.setInt(OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 
5);
+    clientConf.setLong(OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY, 100);
+    try (RootedOzoneFileSystem fs = (RootedOzoneFileSystem) 
FileSystem.get(clientConf)) {
+      int dataSize = 100;
+      final byte[] data = getData(dataSize);
+
+      final FSDataOutputStream stream = fs.create(file, true);
+      OzoneManager om = cluster.getOzoneManager();
       try {
-        stream.close();
-      } catch (Throwable e) {
+        stream.write(data);
+        stream.hsync();
+        assertFalse(fs.isFileClosed(file));
+
+        // close OM
+        if (om.stop()) {
+          om.join();
+        }
+        assertThrows(ConnectException.class, () -> fs.recoverLease(file));
+      } finally {
+        try {
+          stream.close();
+        } catch (Throwable e) {
+        }
       }
-      cluster.getOzoneManager().restart();
+
+      om.restart();
       cluster.waitForClusterToBeReady();
       assertTrue(fs.recoverLease(file));
     }
@@ -473,7 +495,6 @@ public class TestLeaseRecovery {
   public void testRecoverWrongFile() throws Exception {
     final Path notExistFile = new Path(dir, "file1");
 
-    RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
     int dataSize = 100;
     final byte[] data = getData(dataSize);
 
@@ -491,8 +512,6 @@ public class TestLeaseRecovery {
 
   @Test
   public void testRecoveryWithoutBlocks() throws Exception {
-    RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
-
     final FSDataOutputStream stream = fs.create(file, true);
     try {
       stream.hsync();
@@ -512,7 +531,6 @@ public class TestLeaseRecovery {
 
   @Test
   public void testRecoveryWithPartialFilledHsyncBlock() throws Exception {
-    RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
     int blockSize = (int) 
cluster.getOzoneManager().getConfiguration().getStorageSize(
         OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
     final byte[] data = getData(blockSize - 1);
@@ -524,10 +542,9 @@ public class TestLeaseRecovery {
       stream.hsync();
 
       StorageContainerManager scm = cluster.getStorageContainerManager();
-      ContainerInfo container = 
scm.getContainerManager().getContainers().get(0);
       // Close container so that new data won't be written into the same block
       // block1 is partially filled
-      OzoneTestUtils.closeContainer(scm, container);
+      ContainerInfo container = closeLatestContainer();
       GenericTestUtils.waitFor(() -> {
         try {
           return 
scm.getPipelineManager().getPipeline(container.getPipelineID()).isClosed();
@@ -560,9 +577,15 @@ public class TestLeaseRecovery {
     verifyData(data, (blockSize - 1) * 2, file, fs);
   }
 
+  private ContainerInfo closeLatestContainer() throws IOException, 
TimeoutException, InterruptedException {
+    StorageContainerManager scm = cluster.getStorageContainerManager();
+    ContainerInfo container = new 
LinkedList<>(scm.getContainerManager().getContainers()).getLast();
+    OzoneTestUtils.closeContainer(scm, container);
+    return container;
+  }
+
   @Test
   public void testRecoveryWithSameBlockCountInOpenFileAndFileTable() throws 
Exception {
-    RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
     int blockSize = (int) 
cluster.getOzoneManager().getConfiguration().getStorageSize(
         OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
     final byte[] data = getData(blockSize / 2 - 1);
@@ -598,8 +621,8 @@ public class TestLeaseRecovery {
     verifyData(data, (blockSize / 2 - 1) * 2, file, fs);
   }
 
-  private void verifyData(byte[] data, int dataSize, Path filePath, 
RootedOzoneFileSystem fs) throws IOException {
-    try (FSDataInputStream fdis = fs.open(filePath)) {
+  private void verifyData(byte[] data, int dataSize, Path filePath, FileSystem 
fileSystem) throws IOException {
+    try (FSDataInputStream fdis = fileSystem.open(filePath)) {
       int bufferSize = dataSize > data.length ? dataSize / 2 : dataSize;
       while (dataSize > 0) {
         byte[] readData = new byte[bufferSize];


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to