This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new c387656530 HDDS-12063. Speed up TestLeaseRecovery (#7688)
c387656530 is described below
commit c3876565308902cbe96fb9b9ce0b8d9030f6e7c9
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Mon Jan 13 15:03:21 2025 +0100
HDDS-12063. Speed up TestLeaseRecovery (#7688)
---
.../apache/hadoop/fs/ozone/TestLeaseRecovery.java | 237 +++++++++++----------
1 file changed, 130 insertions(+), 107 deletions(-)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java
index d488530033..f178bf2435 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java
@@ -41,14 +41,22 @@ import org.apache.hadoop.ozone.TestDataUtil;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.utils.FaultInjectorImpl;
import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.OzoneTestBase;
import org.apache.ozone.test.tag.Flaky;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -58,8 +66,10 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.ConnectException;
+import java.util.LinkedList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_READ_TIMEOUT;
@@ -75,6 +85,7 @@ import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -83,7 +94,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
*/
@Timeout(300)
@Flaky("HDDS-11323")
-public class TestLeaseRecovery {
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class TestLeaseRecovery extends OzoneTestBase {
+
+ private static final AtomicInteger FILE_COUNTER = new AtomicInteger();
private MiniOzoneCluster cluster;
private OzoneBucket bucket;
@@ -92,6 +107,8 @@ public class TestLeaseRecovery {
private final OzoneConfiguration conf = new OzoneConfiguration();
private String dir;
private Path file;
+ private GenericTestUtils.LogCapturer xceiverClientLogs;
+ private RootedOzoneFileSystem fs;
/**
* Closing the output stream after lease recovery throws because the key
@@ -104,12 +121,15 @@ public class TestLeaseRecovery {
public static void closeIgnoringOMException(OutputStream stream,
OMException.ResultCodes expectedResultCode) {
try {
stream.close();
- } catch (IOException e) {
- assertEquals(expectedResultCode, ((OMException)e).getResult());
+ } catch (OMException e) {
+ assertEquals(expectedResultCode, e.getResult());
+ } catch (Exception e) {
+ OMException omException = assertInstanceOf(OMException.class,
e.getCause());
+ assertEquals(expectedResultCode, omException.getResult());
}
}
- @BeforeEach
+ @BeforeAll
public void init() throws IOException, InterruptedException,
TimeoutException {
final int chunkSize = 16 << 10;
@@ -120,6 +140,7 @@ public class TestLeaseRecovery {
conf.setBoolean(OzoneConfigKeys.OZONE_HBASE_ENHANCEMENTS_ALLOWED, true);
conf.setBoolean("ozone.client.hbase.enhancements.allowed", true);
+ conf.setBoolean("fs." + OZONE_OFS_URI_SCHEME + ".impl.disable.cache",
true);
conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
conf.set(OZONE_DEFAULT_BUCKET_LAYOUT, layout.name());
conf.setInt(OZONE_SCM_RATIS_PIPELINE_LIMIT, 10);
@@ -152,10 +173,24 @@ public class TestLeaseRecovery {
final String rootPath = String.format("%s://%s/", OZONE_OFS_URI_SCHEME,
conf.get(OZONE_OM_ADDRESS_KEY));
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
dir = OZONE_ROOT + bucket.getVolumeName() + OZONE_URI_DELIMITER +
bucket.getName();
- file = new Path(dir, "file");
+
+ xceiverClientLogs =
GenericTestUtils.LogCapturer.captureLogs(XceiverClientGrpc.getLogger());
+ }
+
+ @BeforeEach
+ void beforeEach() throws Exception {
+ file = new Path(dir, "file-" + getTestName() + "-" +
FILE_COUNTER.incrementAndGet());
+ fs = (RootedOzoneFileSystem) FileSystem.get(conf);
}
@AfterEach
+ void afterEach() {
+ IOUtils.closeQuietly(fs);
+ xceiverClientLogs.clearOutput();
+ KeyValueHandler.setInjector(null);
+ }
+
+ @AfterAll
public void tearDown() {
IOUtils.closeQuietly(client);
if (cluster != null) {
@@ -166,8 +201,6 @@ public class TestLeaseRecovery {
@ParameterizedTest
@ValueSource(ints = {1 << 17, (1 << 17) + 1, (1 << 17) - 1})
public void testRecovery(int dataSize) throws Exception {
- RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
-
final byte[] data = getData(dataSize);
final FSDataOutputStream stream = fs.create(file, true);
@@ -199,8 +232,6 @@ public class TestLeaseRecovery {
@Test
public void testRecoveryWithoutHsyncHflushOnLastBlock() throws Exception {
- RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
-
int blockSize = (int)
cluster.getOzoneManager().getConfiguration().getStorageSize(
OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
@@ -235,24 +266,16 @@ public class TestLeaseRecovery {
@Test
public void testOBSRecoveryShouldFail() throws Exception {
- // Set the fs.defaultFS
- bucket = TestDataUtil.createVolumeAndBucket(client,
+ OzoneBucket obsBucket = TestDataUtil.createVolumeAndBucket(client,
"vol2", "obs", BucketLayout.OBJECT_STORE);
- final String rootPath = String.format("%s://%s/", OZONE_OFS_URI_SCHEME,
- conf.get(OZONE_OM_ADDRESS_KEY));
- conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+ String obsDir = OZONE_ROOT + obsBucket.getVolumeName() +
OZONE_URI_DELIMITER + obsBucket.getName();
+ Path obsFile = new Path(obsDir, "file" + getTestName() +
FILE_COUNTER.incrementAndGet());
- final String directory = OZONE_ROOT + bucket.getVolumeName() +
- OZONE_URI_DELIMITER + bucket.getName();
- final Path f = new Path(directory, "file");
-
- RootedOzoneFileSystem fs = (RootedOzoneFileSystem) FileSystem.get(conf);
- assertThrows(IllegalArgumentException.class, () -> fs.recoverLease(f));
+ assertThrows(IllegalArgumentException.class, () ->
fs.recoverLease(obsFile));
}
@Test
public void testFinalizeBlockFailure() throws Exception {
- RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
int dataSize = 100;
final byte[] data = getData(dataSize);
@@ -294,7 +317,6 @@ public class TestLeaseRecovery {
@Test
public void testBlockPipelineClosed() throws Exception {
- RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
int dataSize = 100;
final byte[] data = getData(dataSize);
@@ -310,8 +332,7 @@ public class TestLeaseRecovery {
// close the pipeline
StorageContainerManager scm = cluster.getStorageContainerManager();
- ContainerInfo container =
scm.getContainerManager().getContainers().get(0);
- OzoneTestUtils.closeContainer(scm, container);
+ ContainerInfo container = closeLatestContainer();
GenericTestUtils.waitFor(() -> {
try {
return
scm.getPipelineManager().getPipeline(container.getPipelineID()).isClosed();
@@ -338,62 +359,59 @@ public class TestLeaseRecovery {
@ValueSource(booleans = {false, true})
public void testGetCommittedBlockLengthTimeout(boolean forceRecovery) throws
Exception {
// reduce read timeout
- conf.set(OZONE_CLIENT_READ_TIMEOUT, "2s");
+ OzoneConfiguration clientConf = new OzoneConfiguration(conf);
+ clientConf.set(OZONE_CLIENT_READ_TIMEOUT, "2s");
// set force recovery
System.setProperty(FORCE_LEASE_RECOVERY_ENV,
String.valueOf(forceRecovery));
- RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
- int dataSize = 100;
- final byte[] data = getData(dataSize);
+ try (RootedOzoneFileSystem fs =
(RootedOzoneFileSystem)FileSystem.get(clientConf)) {
+ int dataSize = 100;
+ final byte[] data = getData(dataSize);
- final FSDataOutputStream stream = fs.create(file, true);
- try {
- stream.write(data);
- stream.hsync();
- assertFalse(fs.isFileClosed(file));
-
- // write more data without hsync
- stream.write(data);
- stream.flush();
-
- // close the pipeline and container
- ContainerInfo container =
cluster.getStorageContainerManager().getContainerManager().getContainers().get(0);
- OzoneTestUtils.closeContainer(cluster.getStorageContainerManager(),
container);
- // pause getCommittedBlockLength handling on all DNs to make sure all
getCommittedBlockLength will time out
- FaultInjectorImpl injector = new FaultInjectorImpl();
- injector.setType(ContainerProtos.Type.GetCommittedBlockLength);
- KeyValueHandler.setInjector(injector);
- GenericTestUtils.LogCapturer logs =
-
GenericTestUtils.LogCapturer.captureLogs(XceiverClientGrpc.getLogger());
- if (!forceRecovery) {
- assertThrows(IOException.class, () -> fs.recoverLease(file));
- return;
- } else {
- fs.recoverLease(file);
+ final FSDataOutputStream stream = fs.create(file, true);
+ try {
+ stream.write(data);
+ stream.hsync();
+ assertFalse(fs.isFileClosed(file));
+
+ // write more data without hsync
+ stream.write(data);
+ stream.flush();
+
+ // close the pipeline and container
+ closeLatestContainer();
+ // pause getCommittedBlockLength handling on all DNs to make sure all
getCommittedBlockLength will time out
+ FaultInjectorImpl injector = new FaultInjectorImpl();
+ injector.setType(ContainerProtos.Type.GetCommittedBlockLength);
+ KeyValueHandler.setInjector(injector);
+ if (!forceRecovery) {
+ assertThrows(IOException.class, () -> fs.recoverLease(file));
+ return;
+ } else {
+ fs.recoverLease(file);
+ }
+ assertEquals(3, StringUtils.countMatches(xceiverClientLogs.getOutput(),
+ "Executing command cmdType: GetCommittedBlockLength"));
+
+ // The lease should have been recovered.
+ assertTrue(fs.isFileClosed(file), "File should be closed");
+ FileStatus fileStatus = fs.getFileStatus(file);
+ // Since all DNs are out, then the length in OM keyInfo will be used
as the final file length
+ assertEquals(dataSize, fileStatus.getLen());
+ } finally {
+ if (!forceRecovery) {
+ closeIgnoringOMException(stream,
OMException.ResultCodes.KEY_UNDER_LEASE_RECOVERY);
+ } else {
+ closeIgnoringKeyNotFound(stream);
+ }
}
- assertEquals(3, StringUtils.countMatches(logs.getOutput(),
- "Executing command cmdType: GetCommittedBlockLength"));
- // The lease should have been recovered.
- assertTrue(fs.isFileClosed(file), "File should be closed");
- FileStatus fileStatus = fs.getFileStatus(file);
- // Since all DNs are out, then the length in OM keyInfo will be used as
the final file length
- assertEquals(dataSize, fileStatus.getLen());
- } finally {
- if (!forceRecovery) {
- closeIgnoringOMException(stream,
OMException.ResultCodes.KEY_UNDER_LEASE_RECOVERY);
- } else {
- closeIgnoringKeyNotFound(stream);
- }
- KeyValueHandler.setInjector(null);
+ // open it again, make sure the data is correct
+ verifyData(data, dataSize, file, fs);
}
-
- // open it again, make sure the data is correct
- verifyData(data, dataSize, file, fs);
}
@Test
public void testGetCommittedBlockLengthWithException() throws Exception {
- RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
int dataSize = 100;
final byte[] data = getData(dataSize);
@@ -408,8 +426,7 @@ public class TestLeaseRecovery {
stream.flush();
// close the pipeline and container
- ContainerInfo container =
cluster.getStorageContainerManager().getContainerManager().getContainers().get(0);
- OzoneTestUtils.closeContainer(cluster.getStorageContainerManager(),
container);
+ ContainerInfo container = closeLatestContainer();
// throw exception on first DN getCommittedBlockLength handling
FaultInjectorImpl injector = new FaultInjectorImpl();
KeyValueHandler.setInjector(injector);
@@ -418,14 +435,13 @@ public class TestLeaseRecovery {
ContainerProtos.Result.CONTAINER_NOT_FOUND);
injector.setException(sce);
- GenericTestUtils.LogCapturer logs =
-
GenericTestUtils.LogCapturer.captureLogs(XceiverClientGrpc.getLogger());
fs.recoverLease(file);
- assertEquals(2, StringUtils.countMatches(logs.getOutput(),
- "Executing command cmdType: GetCommittedBlockLength"));
- assertEquals(1, StringUtils.countMatches(logs.getOutput(),
- "Failed to execute command cmdType: GetCommittedBlockLength"));
+ String output = xceiverClientLogs.getOutput();
+ assertEquals(2, StringUtils.countMatches(output,
+ "Executing command cmdType: GetCommittedBlockLength"), output);
+ assertEquals(1, StringUtils.countMatches(output,
+ "Failed to execute command cmdType: GetCommittedBlockLength"),
output);
// The lease should have been recovered.
assertTrue(fs.isFileClosed(file), "File should be closed");
@@ -433,7 +449,6 @@ public class TestLeaseRecovery {
assertEquals(dataSize * 2, fileStatus.getLen());
} finally {
closeIgnoringKeyNotFound(stream);
- KeyValueHandler.setInjector(null);
}
// open it again, make sure the data is correct
@@ -441,29 +456,36 @@ public class TestLeaseRecovery {
}
@Test
+ @Order(Integer.MAX_VALUE)
public void testOMConnectionFailure() throws Exception {
// reduce hadoop RPC retry max attempts
- conf.setInt(OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 5);
- conf.setLong(OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY, 100);
- RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
- int dataSize = 100;
- final byte[] data = getData(dataSize);
-
- final FSDataOutputStream stream = fs.create(file, true);
- try {
- stream.write(data);
- stream.hsync();
- assertFalse(fs.isFileClosed(file));
-
- // close OM
- cluster.getOzoneManager().stop();
- assertThrows(ConnectException.class, () -> fs.recoverLease(file));
- } finally {
+ OzoneConfiguration clientConf = new OzoneConfiguration(conf);
+ clientConf.setInt(OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
5);
+ clientConf.setLong(OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY, 100);
+ try (RootedOzoneFileSystem fs = (RootedOzoneFileSystem)
FileSystem.get(clientConf)) {
+ int dataSize = 100;
+ final byte[] data = getData(dataSize);
+
+ final FSDataOutputStream stream = fs.create(file, true);
+ OzoneManager om = cluster.getOzoneManager();
try {
- stream.close();
- } catch (Throwable e) {
+ stream.write(data);
+ stream.hsync();
+ assertFalse(fs.isFileClosed(file));
+
+ // close OM
+ if (om.stop()) {
+ om.join();
+ }
+ assertThrows(ConnectException.class, () -> fs.recoverLease(file));
+ } finally {
+ try {
+ stream.close();
+ } catch (Throwable e) {
+ }
}
- cluster.getOzoneManager().restart();
+
+ om.restart();
cluster.waitForClusterToBeReady();
assertTrue(fs.recoverLease(file));
}
@@ -473,7 +495,6 @@ public class TestLeaseRecovery {
public void testRecoverWrongFile() throws Exception {
final Path notExistFile = new Path(dir, "file1");
- RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
int dataSize = 100;
final byte[] data = getData(dataSize);
@@ -491,8 +512,6 @@ public class TestLeaseRecovery {
@Test
public void testRecoveryWithoutBlocks() throws Exception {
- RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
-
final FSDataOutputStream stream = fs.create(file, true);
try {
stream.hsync();
@@ -512,7 +531,6 @@ public class TestLeaseRecovery {
@Test
public void testRecoveryWithPartialFilledHsyncBlock() throws Exception {
- RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
int blockSize = (int)
cluster.getOzoneManager().getConfiguration().getStorageSize(
OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
final byte[] data = getData(blockSize - 1);
@@ -524,10 +542,9 @@ public class TestLeaseRecovery {
stream.hsync();
StorageContainerManager scm = cluster.getStorageContainerManager();
- ContainerInfo container =
scm.getContainerManager().getContainers().get(0);
// Close container so that new data won't be written into the same block
// block1 is partially filled
- OzoneTestUtils.closeContainer(scm, container);
+ ContainerInfo container = closeLatestContainer();
GenericTestUtils.waitFor(() -> {
try {
return
scm.getPipelineManager().getPipeline(container.getPipelineID()).isClosed();
@@ -560,9 +577,15 @@ public class TestLeaseRecovery {
verifyData(data, (blockSize - 1) * 2, file, fs);
}
+ private ContainerInfo closeLatestContainer() throws IOException,
TimeoutException, InterruptedException {
+ StorageContainerManager scm = cluster.getStorageContainerManager();
+ ContainerInfo container = new
LinkedList<>(scm.getContainerManager().getContainers()).getLast();
+ OzoneTestUtils.closeContainer(scm, container);
+ return container;
+ }
+
@Test
public void testRecoveryWithSameBlockCountInOpenFileAndFileTable() throws
Exception {
- RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
int blockSize = (int)
cluster.getOzoneManager().getConfiguration().getStorageSize(
OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
final byte[] data = getData(blockSize / 2 - 1);
@@ -598,8 +621,8 @@ public class TestLeaseRecovery {
verifyData(data, (blockSize / 2 - 1) * 2, file, fs);
}
- private void verifyData(byte[] data, int dataSize, Path filePath,
RootedOzoneFileSystem fs) throws IOException {
- try (FSDataInputStream fdis = fs.open(filePath)) {
+ private void verifyData(byte[] data, int dataSize, Path filePath, FileSystem
fileSystem) throws IOException {
+ try (FSDataInputStream fdis = fileSystem.open(filePath)) {
int bufferSize = dataSize > data.length ? dataSize / 2 : dataSize;
while (dataSize > 0) {
byte[] readData = new byte[bufferSize];
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]