Repository: hadoop Updated Branches: refs/heads/branch-2 3793d2b9c -> 3fbcf7632
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fbcf763/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/CleanupTestContainers.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/CleanupTestContainers.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/CleanupTestContainers.java new file mode 100644 index 0000000..059a8c4 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/CleanupTestContainers.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure.integration; + +import java.util.EnumSet; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.blob.CloudBlobClient; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.junit.Test; + +import org.apache.hadoop.fs.azure.AbstractWasbTestBase; +import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount; + +/** + * This looks like a test, but it is really a command to invoke to + * clean up containers created in other test runs. + * + */ +public class CleanupTestContainers extends AbstractWasbTestBase { + + private static final String CONTAINER_PREFIX = "wasbtests-"; + + @Override + protected AzureBlobStorageTestAccount createTestAccount() throws Exception { + return AzureBlobStorageTestAccount.create( + "CleanupTestContainers", + EnumSet.noneOf(AzureBlobStorageTestAccount.CreateOptions.class), + createConfiguration(), + true); + } + + @Test + public void testEnumContainers() throws Throwable { + describe("Enumerating all the WASB test containers"); + + int count = 0; + CloudStorageAccount storageAccount = getTestAccount().getRealAccount(); + CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); + Iterable<CloudBlobContainer> containers + = blobClient.listContainers(CONTAINER_PREFIX); + for (CloudBlobContainer container : containers) { + count++; + LOG.info("Container {} URI {}", + container.getName(), + container.getUri()); + } + LOG.info("Found {} test containers", count); + } + + @Test + public void testDeleteContainers() throws Throwable { + describe("Delete all the WASB test containers"); + int count = 0; + CloudStorageAccount storageAccount = getTestAccount().getRealAccount(); + CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); + Iterable<CloudBlobContainer> containers + = blobClient.listContainers(CONTAINER_PREFIX); + for (CloudBlobContainer container : containers) { + LOG.info("Container {} URI {}", + container.getName(), + container.getUri()); + if (container.deleteIfExists()) { + count++; + } + } + LOG.info("Deleted {} test containers", count); + } + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fbcf763/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/ITestAzureHugeFiles.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/ITestAzureHugeFiles.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/ITestAzureHugeFiles.java new file mode 100644 index 0000000..850aca1 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/ITestAzureHugeFiles.java @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure.integration; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.EnumSet; +import java.util.Iterator; + +import org.junit.Assert; +import org.junit.Assume; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageStatistics; +import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount; +import org.apache.hadoop.fs.azure.NativeAzureFileSystem; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.io.IOUtils; + +import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.*; +import static org.apache.hadoop.fs.contract.ContractTestUtils.*; + + +/** + * Scale test which creates a huge file. + * + * <b>Important:</b> the order in which these tests execute is fixed to + * alphabetical order. Test cases are numbered {@code test_123_} to impose + * an ordering based on the numbers. + * + * Having this ordering allows the tests to assume that the huge file + * exists. Even so: they should all have a {@link #assumeHugeFileExists()} + * check at the start, in case an individual test is executed. + * + * <b>Ignore checkstyle complaints about naming: we need a scheme with visible + * ordering.</b> + */ + +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class ITestAzureHugeFiles extends AbstractAzureScaleTest { + + private static final Logger LOG = LoggerFactory.getLogger( + ITestAzureHugeFiles.class); + + private Path scaleTestDir; + private Path hugefile; + private Path hugefileRenamed; + private AzureBlobStorageTestAccount testAccountForCleanup; + + private static final int UPLOAD_BLOCKSIZE = 64 * S_1K; + private static final byte[] SOURCE_DATA; + + static { + SOURCE_DATA = dataset(UPLOAD_BLOCKSIZE, 0, S_256); + } + + private Path testPath; + + @Override + public void setUp() throws Exception { + super.setUp(); + testPath = path("ITestAzureHugeFiles"); + scaleTestDir = new Path(testPath, "scale"); + hugefile = new Path(scaleTestDir, "hugefile"); + hugefileRenamed = new Path(scaleTestDir, "hugefileRenamed"); + } + + /** + * Only clean up the test account (and delete the container) if the account + * is set in the field {@code testAccountForCleanup}. + * @throws Exception + */ + @Override + public void tearDown() throws Exception { + testAccount = null; + super.tearDown(); + if (testAccountForCleanup != null) { + cleanupTestAccount(testAccount); + } + } + + @Override + protected AzureBlobStorageTestAccount createTestAccount() throws Exception { + return AzureBlobStorageTestAccount.create( + "testazurehugefiles", + EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer), + createConfiguration(), + true); + } + + /** + * Stop the test-case teardown from deleting the test path. + * @throws IOException never + */ + protected void deleteTestDirInTeardown() throws IOException { + // this is a no-op, so the test file is preserved. + // the last test in the suite does the teardown + } + + protected void deleteHugeFile() throws IOException { + describe("Deleting %s", hugefile); + ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); + getFileSystem().delete(hugefile, false); + timer.end("time to delete %s", hugefile); + } + + /** + * Log how long an IOP took, by dividing the total time by the + * count of operations, printing in a human-readable form. + * @param operation operation being measured + * @param timer timing data + * @param count IOP count. + */ + protected void logTimePerIOP(String operation, + ContractTestUtils.NanoTimer timer, + long count) { + LOG.info("Time per {}: {} nS", + operation, toHuman(timer.duration() / count)); + } + + /** + * Assume that the huge file exists, skip if not/empty. + * @return the file status + * @throws IOException IO failure + */ + FileStatus assumeHugeFileExists() throws IOException { + assertPathExists(getFileSystem(), "huge file not created", hugefile); + try { + FileStatus status = getFileSystem().getFileStatus(hugefile); + Assume.assumeTrue("Not a file: " + status, status.isFile()); + Assume.assumeTrue("File " + hugefile + " is empty", status.getLen() > 0); + return status; + } catch (FileNotFoundException e) { + skip("huge file not created: " + hugefile); + } + return null; + } + + /** + * If/when {@link NativeAzureFileSystem#getStorageStatistics()} returns + * statistics, this will be interesting. + */ + private void logFSState() { + StorageStatistics statistics = getFileSystem().getStorageStatistics(); + Iterator<StorageStatistics.LongStatistic> longStatistics + = statistics.getLongStatistics(); + while (longStatistics.hasNext()) { + StorageStatistics.LongStatistic next = longStatistics.next(); + LOG.info("{} = {}", next.getName(), next.getValue()); + } + } + + @Test + public void test_010_CreateHugeFile() throws IOException { + long filesize = getTestPropertyBytes(getConfiguration(), + KEY_HUGE_FILESIZE, + DEFAULT_HUGE_FILESIZE); + long filesizeMB = filesize / S_1M; + + // clean up from any previous attempts + deleteHugeFile(); + + describe("Creating file %s of size %d MB", hugefile, filesizeMB); + + // now do a check of available upload time, with a pessimistic bandwidth + // (that of remote upload tests). If the test times out then not only is + // the test outcome lost, as the follow-on tests continue, they will + // overlap with the ongoing upload test, for much confusion. +/* + int timeout = getTestTimeoutSeconds(); + // assume 1 MB/s upload bandwidth + int bandwidth = _1MB; + long uploadTime = filesize / bandwidth; + assertTrue(String.format("Timeout set in %s seconds is too low;" + + " estimating upload time of %d seconds at 1 MB/s." + + " Rerun tests with -D%s=%d", + timeout, uploadTime, KEY_TEST_TIMEOUT, uploadTime * 2), + uploadTime < timeout); +*/ + assertEquals("File size set in " + KEY_HUGE_FILESIZE + " = " + filesize + + " is not a multiple of " + UPLOAD_BLOCKSIZE, + 0, filesize % UPLOAD_BLOCKSIZE); + + byte[] data = SOURCE_DATA; + + long blocks = filesize / UPLOAD_BLOCKSIZE; + long blocksPerMB = S_1M / UPLOAD_BLOCKSIZE; + + // perform the upload. + // there's lots of logging here, so that a tail -f on the output log + // can give a view of what is happening. + NativeAzureFileSystem fs = getFileSystem(); + + ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); + long blocksPer10MB = blocksPerMB * 10; + fs.mkdirs(hugefile.getParent()); + try (FSDataOutputStream out = fs.create(hugefile, + true, + UPLOAD_BLOCKSIZE, + null)) { + for (long block = 1; block <= blocks; block++) { + out.write(data); + long written = block * UPLOAD_BLOCKSIZE; + // every 10 MB and on file upload @ 100%, print some stats + if (block % blocksPer10MB == 0 || written == filesize) { + long percentage = written * 100 / filesize; + double elapsedTime = timer.elapsedTime() / NANOSEC; + double writtenMB = 1.0 * written / S_1M; + LOG.info(String.format("[%02d%%] Buffered %.2f MB out of %d MB;" + + " elapsedTime=%.2fs; write to buffer bandwidth=%.2f MB/s", + percentage, + writtenMB, + filesizeMB, + elapsedTime, + writtenMB / elapsedTime)); + } + } + // now close the file + LOG.info("Closing stream {}", out); + ContractTestUtils.NanoTimer closeTimer + = new ContractTestUtils.NanoTimer(); + out.close(); + closeTimer.end("time to close() output stream"); + } + + timer.end("time to write %d MB in blocks of %d", + filesizeMB, UPLOAD_BLOCKSIZE); + logFSState(); + bandwidth(timer, filesize); + ContractTestUtils.assertPathExists(fs, "Huge file", hugefile); + FileStatus status = fs.getFileStatus(hugefile); + ContractTestUtils.assertIsFile(hugefile, status); + assertEquals("File size in " + status, filesize, status.getLen()); + } + + @Test + public void test_040_PositionedReadHugeFile() throws Throwable { + assumeHugeFileExists(); + describe("Positioned reads of file %s", hugefile); + NativeAzureFileSystem fs = getFileSystem(); + FileStatus status = fs.getFileStatus(hugefile); + long filesize = status.getLen(); + int ops = 0; + final int bufferSize = 8192; + byte[] buffer = new byte[bufferSize]; + long eof = filesize - 1; + + ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); + ContractTestUtils.NanoTimer readAtByte0, readAtByte0Again, readAtEOF; + try (FSDataInputStream in = openDataFile()) { + readAtByte0 = new ContractTestUtils.NanoTimer(); + in.readFully(0, buffer); + readAtByte0.end("time to read data at start of file"); + ops++; + + readAtEOF = new ContractTestUtils.NanoTimer(); + in.readFully(eof - bufferSize, buffer); + readAtEOF.end("time to read data at end of file"); + ops++; + + readAtByte0Again = new ContractTestUtils.NanoTimer(); + in.readFully(0, buffer); + readAtByte0Again.end("time to read data at start of file again"); + ops++; + LOG.info("Final stream state: {}", in); + } + long mb = Math.max(filesize / S_1M, 1); + + logFSState(); + timer.end("time to performed positioned reads of %d MB ", mb); + LOG.info("Time per positioned read = {} nS", + toHuman(timer.nanosPerOperation(ops))); + } + + protected FSDataInputStream openDataFile() throws IOException { + NanoTimer openTimer = new NanoTimer(); + FSDataInputStream inputStream = getFileSystem().open(hugefile, + UPLOAD_BLOCKSIZE); + openTimer.end("open data file"); + return inputStream; + } + + + /** + * Work out the bandwidth in bytes/second. + * @param timer timer measuring the duration + * @param bytes bytes + * @return the number of bytes/second of the recorded operation + */ + public static double bandwidthInBytes(NanoTimer timer, long bytes) { + return bytes * NANOSEC / timer.duration(); + } + + @Test + public void test_050_readHugeFile() throws Throwable { + assumeHugeFileExists(); + describe("Reading %s", hugefile); + NativeAzureFileSystem fs = getFileSystem(); + FileStatus status = fs.getFileStatus(hugefile); + long filesize = status.getLen(); + long blocks = filesize / UPLOAD_BLOCKSIZE; + byte[] data = new byte[UPLOAD_BLOCKSIZE]; + + ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); + try (FSDataInputStream in = openDataFile()) { + for (long block = 0; block < blocks; block++) { + in.readFully(data); + } + LOG.info("Final stream state: {}", in); + } + + long mb = Math.max(filesize / S_1M, 1); + timer.end("time to read file of %d MB ", mb); + LOG.info("Time per MB to read = {} nS", + toHuman(timer.nanosPerOperation(mb))); + bandwidth(timer, filesize); + logFSState(); + } + + @Test + public void test_060_openAndReadWholeFileBlocks() throws Throwable { + FileStatus status = assumeHugeFileExists(); + int blockSize = S_1M; + describe("Open the test file and read it in blocks of size %d", + blockSize); + long len = status.getLen(); + FSDataInputStream in = openDataFile(); + NanoTimer timer2 = null; + long blockCount = 0; + long totalToRead = 0; + int resetCount = 0; + try { + byte[] block = new byte[blockSize]; + timer2 = new NanoTimer(); + long count = 0; + // implicitly rounding down here + blockCount = len / blockSize; + totalToRead = blockCount * blockSize; + long minimumBandwidth = S_128K; + int maxResetCount = 4; + resetCount = 0; + for (long i = 0; i < blockCount; i++) { + int offset = 0; + int remaining = blockSize; + long blockId = i + 1; + NanoTimer blockTimer = new NanoTimer(); + int reads = 0; + while (remaining > 0) { + NanoTimer readTimer = new NanoTimer(); + int bytesRead = in.read(block, offset, remaining); + reads++; + if (bytesRead == 1) { + break; + } + remaining -= bytesRead; + offset += bytesRead; + count += bytesRead; + readTimer.end(); + if (bytesRead != 0) { + LOG.debug("Bytes in read #{}: {} , block bytes: {}," + + " remaining in block: {}" + + " duration={} nS; ns/byte: {}, bandwidth={} MB/s", + reads, bytesRead, blockSize - remaining, remaining, + readTimer.duration(), + readTimer.nanosPerOperation(bytesRead), + readTimer.bandwidthDescription(bytesRead)); + } else { + LOG.warn("0 bytes returned by read() operation #{}", reads); + } + } + blockTimer.end("Reading block %d in %d reads", blockId, reads); + String bw = blockTimer.bandwidthDescription(blockSize); + LOG.info("Bandwidth of block {}: {} MB/s: ", blockId, bw); + if (bandwidthInBytes(blockTimer, blockSize) < minimumBandwidth) { + LOG.warn("Bandwidth {} too low on block {}: resetting connection", + bw, blockId); + Assert.assertTrue("Bandwidth of " + bw + " too low after " + + resetCount + " attempts", resetCount <= maxResetCount); + resetCount++; + // reset the connection + } + } + } finally { + IOUtils.closeStream(in); + } + timer2.end("Time to read %d bytes in %d blocks", totalToRead, blockCount); + LOG.info("Overall Bandwidth {} MB/s; reset connections {}", + timer2.bandwidth(totalToRead), resetCount); + } + + @Test + public void test_100_renameHugeFile() throws Throwable { + assumeHugeFileExists(); + describe("renaming %s to %s", hugefile, hugefileRenamed); + NativeAzureFileSystem fs = getFileSystem(); + FileStatus status = fs.getFileStatus(hugefile); + long filesize = status.getLen(); + fs.delete(hugefileRenamed, false); + ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); + fs.rename(hugefile, hugefileRenamed); + long mb = Math.max(filesize / S_1M, 1); + timer.end("time to rename file of %d MB", mb); + LOG.info("Time per MB to rename = {} nS", + toHuman(timer.nanosPerOperation(mb))); + bandwidth(timer, filesize); + logFSState(); + FileStatus destFileStatus = fs.getFileStatus(hugefileRenamed); + assertEquals(filesize, destFileStatus.getLen()); + + // rename back + ContractTestUtils.NanoTimer timer2 = new ContractTestUtils.NanoTimer(); + fs.rename(hugefileRenamed, hugefile); + timer2.end("Renaming back"); + LOG.info("Time per MB to rename = {} nS", + toHuman(timer2.nanosPerOperation(mb))); + bandwidth(timer2, filesize); + } + + @Test + public void test_999_deleteHugeFiles() throws IOException { + // mark the test account for cleanup after this test + testAccountForCleanup = testAccount; + deleteHugeFile(); + ContractTestUtils.NanoTimer timer2 = new ContractTestUtils.NanoTimer(); + NativeAzureFileSystem fs = getFileSystem(); + fs.delete(hugefileRenamed, false); + timer2.end("time to delete %s", hugefileRenamed); + rm(fs, testPath, true, false); + assertPathDoesNotExist(fs, "deleted huge file", testPath); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fbcf763/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/Sizes.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/Sizes.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/Sizes.java new file mode 100644 index 0000000..92b10cf --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/Sizes.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure.integration; + +/** + * Sizes of data. + * Checkstyle doesn't like the naming scheme or the fact its an interface. + */ +public interface Sizes { + + int S_256 = 256; + int S_512 = 512; + int S_1K = 1024; + int S_4K = 4 * S_1K; + int S_8K = 8 * S_1K; + int S_16K = 16 * S_1K; + int S_32K = 32 * S_1K; + int S_64K = 64 * S_1K; + int S_128K = 128 * S_1K; + int S_256K = 256 * S_1K; + int S_1M = S_1K * S_1K; + int S_2M = 2 * S_1M; + int S_5M = 5 * S_1M; + int S_10M = 10* S_1M; + double NANOSEC = 1.0e9; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fbcf763/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/ITestAzureFileSystemInstrumentation.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/ITestAzureFileSystemInstrumentation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/ITestAzureFileSystemInstrumentation.java new file mode 100644 index 0000000..5f08d80 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/ITestAzureFileSystemInstrumentation.java @@ -0,0 +1,586 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure.metrics; + +import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_CLIENT_ERRORS; +import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_DIRECTORIES_CREATED; +import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_DOWNLOAD_LATENCY; +import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_DOWNLOAD_RATE; +import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_FILES_CREATED; +import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_FILES_DELETED; +import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_SERVER_ERRORS; +import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_UPLOAD_LATENCY; +import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_UPLOAD_RATE; +import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_WEB_RESPONSES; +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.verify; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Date; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azure.AbstractWasbTestBase; +import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount; +import org.apache.hadoop.fs.azure.AzureException; +import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore; +import org.apache.hadoop.fs.azure.NativeAzureFileSystem; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsTag; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Instrumentation test, changing state of time and verifying metrics are + * consistent. + */ +public class ITestAzureFileSystemInstrumentation extends AbstractWasbTestBase { + + protected static final Logger LOG = + LoggerFactory.getLogger(ITestAzureFileSystemInstrumentation.class); + + @Override + protected AzureBlobStorageTestAccount createTestAccount() throws Exception { + return AzureBlobStorageTestAccount.create(); + } + + @Test + public void testMetricTags() throws Exception { + String accountName = + getTestAccount().getRealAccount().getBlobEndpoint() + .getAuthority(); + String containerName = + getTestAccount().getRealContainer().getName(); + MetricsRecordBuilder myMetrics = getMyMetrics(); + verify(myMetrics).add(argThat( + new TagMatcher("accountName", accountName) + )); + verify(myMetrics).add(argThat( + new TagMatcher("containerName", containerName) + )); + verify(myMetrics).add(argThat( + new TagMatcher("Context", "azureFileSystem") + )); + verify(myMetrics).add(argThat( + new TagExistsMatcher("wasbFileSystemId") + )); + } + + + @Test + public void testMetricsOnMkdirList() throws Exception { + long base = getBaseWebResponses(); + + // Create a directory + assertTrue(fs.mkdirs(new Path("a"))); + // At the time of writing + // getAncestor uses 2 calls for each folder level /user/<name>/a + // plus 1 call made by checkContainer + // mkdir checks the hierarchy with 2 calls per level + // mkdirs calls storeEmptyDir to create the empty folder, which makes 5 calls + // For a total of 7 + 6 + 5 = 18 web responses + base = assertWebResponsesInRange(base, 1, 18); + assertEquals(1, + AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_DIRECTORIES_CREATED)); + + // List the root contents + assertEquals(1, getFileSystem().listStatus(new Path("/")).length); + base = assertWebResponsesEquals(base, 1); + + assertNoErrors(); + } + + private BandwidthGaugeUpdater getBandwidthGaugeUpdater() { + NativeAzureFileSystem azureFs = (NativeAzureFileSystem) getFileSystem(); + AzureNativeFileSystemStore azureStore = azureFs.getStore(); + return azureStore.getBandwidthGaugeUpdater(); + } + + private static byte[] nonZeroByteArray(int size) { + byte[] data = new byte[size]; + Arrays.fill(data, (byte)5); + return data; + } + + @Test + public void testMetricsOnFileCreateRead() throws Exception { + long base = getBaseWebResponses(); + + assertEquals(0, AzureMetricsTestUtil.getCurrentBytesWritten(getInstrumentation())); + + Path filePath = new Path("/metricsTest_webResponses"); + final int FILE_SIZE = 1000; + + // Suppress auto-update of bandwidth metrics so we get + // to update them exactly when we want to. + getBandwidthGaugeUpdater().suppressAutoUpdate(); + + // Create a file + Date start = new Date(); + OutputStream outputStream = getFileSystem().create(filePath); + outputStream.write(nonZeroByteArray(FILE_SIZE)); + outputStream.close(); + long uploadDurationMs = new Date().getTime() - start.getTime(); + + // The exact number of requests/responses that happen to create a file + // can vary - at the time of writing this code it takes 10 + // requests/responses for the 1000 byte file (33 for 100 MB), + // plus the initial container-check request but that + // can very easily change in the future. Just assert that we do roughly + // more than 2 but less than 15. + logOpResponseCount("Creating a 1K file", base); + base = assertWebResponsesInRange(base, 2, 15); + getBandwidthGaugeUpdater().triggerUpdate(true); + long bytesWritten = AzureMetricsTestUtil.getCurrentBytesWritten(getInstrumentation()); + assertTrue("The bytes written in the last second " + bytesWritten + + " is pretty far from the expected range of around " + FILE_SIZE + + " bytes plus a little overhead.", + bytesWritten > (FILE_SIZE / 2) && bytesWritten < (FILE_SIZE * 2)); + long totalBytesWritten = AzureMetricsTestUtil.getCurrentTotalBytesWritten(getInstrumentation()); + assertTrue("The total bytes written " + totalBytesWritten + + " is pretty far from the expected range of around " + FILE_SIZE + + " bytes plus a little overhead.", + totalBytesWritten >= FILE_SIZE && totalBytesWritten < (FILE_SIZE * 2)); + long uploadRate = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), WASB_UPLOAD_RATE); + LOG.info("Upload rate: " + uploadRate + " bytes/second."); + long expectedRate = (FILE_SIZE * 1000L) / uploadDurationMs; + assertTrue("The upload rate " + uploadRate + + " is below the expected range of around " + expectedRate + + " bytes/second that the unit test observed. This should never be" + + " the case since the test underestimates the rate by looking at " + + " end-to-end time instead of just block upload time.", + uploadRate >= expectedRate); + long uploadLatency = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), + WASB_UPLOAD_LATENCY); + LOG.info("Upload latency: {}", uploadLatency); + long expectedLatency = uploadDurationMs; // We're uploading less than a block. + assertTrue("The upload latency " + uploadLatency + + " should be greater than zero now that I've just uploaded a file.", + uploadLatency > 0); + assertTrue("The upload latency " + uploadLatency + + " is more than the expected range of around " + expectedLatency + + " milliseconds that the unit test observed. This should never be" + + " the case since the test overestimates the latency by looking at " + + " end-to-end time instead of just block upload time.", + uploadLatency <= expectedLatency); + + // Read the file + start = new Date(); + InputStream inputStream = getFileSystem().open(filePath); + int count = 0; + while (inputStream.read() >= 0) { + count++; + } + inputStream.close(); + long downloadDurationMs = new Date().getTime() - start.getTime(); + assertEquals(FILE_SIZE, count); + + // Again, exact number varies. At the time of writing this code + // it takes 4 request/responses, so just assert a rough range between + // 1 and 10. + logOpResponseCount("Reading a 1K file", base); + base = assertWebResponsesInRange(base, 1, 10); + getBandwidthGaugeUpdater().triggerUpdate(false); + long totalBytesRead = AzureMetricsTestUtil.getCurrentTotalBytesRead(getInstrumentation()); + assertEquals(FILE_SIZE, totalBytesRead); + long bytesRead = AzureMetricsTestUtil.getCurrentBytesRead(getInstrumentation()); + assertTrue("The bytes read in the last second " + bytesRead + + " is pretty far from the expected range of around " + FILE_SIZE + + " bytes plus a little overhead.", + bytesRead > (FILE_SIZE / 2) && bytesRead < (FILE_SIZE * 2)); + long downloadRate = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), WASB_DOWNLOAD_RATE); + LOG.info("Download rate: " + downloadRate + " bytes/second."); + expectedRate = (FILE_SIZE * 1000L) / downloadDurationMs; + assertTrue("The download rate " + downloadRate + + " is below the expected range of around " + expectedRate + + " bytes/second that the unit test observed. This should never be" + + " the case since the test underestimates the rate by looking at " + + " end-to-end time instead of just block download time.", + downloadRate >= expectedRate); + long downloadLatency = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), + WASB_DOWNLOAD_LATENCY); + LOG.info("Download latency: " + downloadLatency); + expectedLatency = downloadDurationMs; // We're downloading less than a block. + assertTrue("The download latency " + downloadLatency + + " should be greater than zero now that I've just downloaded a file.", + downloadLatency > 0); + assertTrue("The download latency " + downloadLatency + + " is more than the expected range of around " + expectedLatency + + " milliseconds that the unit test observed. This should never be" + + " the case since the test overestimates the latency by looking at " + + " end-to-end time instead of just block download time.", + downloadLatency <= expectedLatency); + + assertNoErrors(); + } + + @Test + public void testMetricsOnBigFileCreateRead() throws Exception { + long base = getBaseWebResponses(); + + assertEquals(0, AzureMetricsTestUtil.getCurrentBytesWritten(getInstrumentation())); + + Path filePath = new Path("/metricsTest_webResponses"); + final int FILE_SIZE = 100 * 1024 * 1024; + + // Suppress auto-update of bandwidth metrics so we get + // to update them exactly when we want to. + getBandwidthGaugeUpdater().suppressAutoUpdate(); + + // Create a file + OutputStream outputStream = getFileSystem().create(filePath); + outputStream.write(new byte[FILE_SIZE]); + outputStream.close(); + + // The exact number of requests/responses that happen to create a file + // can vary - at the time of writing this code it takes 34 + // requests/responses for the 100 MB file, + // plus the initial container check request, but that + // can very easily change in the future. Just assert that we do roughly + // more than 20 but less than 50. + logOpResponseCount("Creating a 100 MB file", base); + base = assertWebResponsesInRange(base, 20, 50); + getBandwidthGaugeUpdater().triggerUpdate(true); + long totalBytesWritten = AzureMetricsTestUtil.getCurrentTotalBytesWritten(getInstrumentation()); + assertTrue("The total bytes written " + totalBytesWritten + + " is pretty far from the expected range of around " + FILE_SIZE + + " bytes plus a little overhead.", + totalBytesWritten >= FILE_SIZE && totalBytesWritten < (FILE_SIZE * 2)); + long uploadRate = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), WASB_UPLOAD_RATE); + LOG.info("Upload rate: " + uploadRate + " bytes/second."); + long uploadLatency = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), + WASB_UPLOAD_LATENCY); + LOG.info("Upload latency: " + uploadLatency); + assertTrue("The upload latency " + uploadLatency + + " should be greater than zero now that I've just uploaded a file.", + uploadLatency > 0); + + // Read the file + InputStream inputStream = getFileSystem().open(filePath); + int count = 0; + while (inputStream.read() >= 0) { + count++; + } + inputStream.close(); + assertEquals(FILE_SIZE, count); + + // Again, exact number varies. At the time of writing this code + // it takes 27 request/responses, so just assert a rough range between + // 20 and 40. + logOpResponseCount("Reading a 100 MB file", base); + base = assertWebResponsesInRange(base, 20, 40); + getBandwidthGaugeUpdater().triggerUpdate(false); + long totalBytesRead = AzureMetricsTestUtil.getCurrentTotalBytesRead(getInstrumentation()); + assertEquals(FILE_SIZE, totalBytesRead); + long downloadRate = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), WASB_DOWNLOAD_RATE); + LOG.info("Download rate: " + downloadRate + " bytes/second."); + long downloadLatency = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), + WASB_DOWNLOAD_LATENCY); + LOG.info("Download latency: " + downloadLatency); + assertTrue("The download latency " + downloadLatency + + " should be greater than zero now that I've just downloaded a file.", + downloadLatency > 0); + } + + @Test + public void testMetricsOnFileRename() throws Exception { + long base = getBaseWebResponses(); + + Path originalPath = new Path("/metricsTest_RenameStart"); + Path destinationPath = new Path("/metricsTest_RenameFinal"); + + // Create an empty file + assertEquals(0, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_FILES_CREATED)); + assertTrue(getFileSystem().createNewFile(originalPath)); + logOpResponseCount("Creating an empty file", base); + base = assertWebResponsesInRange(base, 2, 20); + assertEquals(1, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_FILES_CREATED)); + + // Rename the file + assertTrue( + ((FileSystem) getFileSystem()).rename(originalPath, destinationPath)); + // Varies: at the time of writing this code it takes 7 requests/responses. + logOpResponseCount("Renaming a file", base); + base = assertWebResponsesInRange(base, 2, 15); + + assertNoErrors(); + } + + @Test + public void testMetricsOnFileExistsDelete() throws Exception { + long base = getBaseWebResponses(); + + Path filePath = new Path("/metricsTest_delete"); + + // Check existence + assertFalse(getFileSystem().exists(filePath)); + // At the time of writing this code it takes 2 requests/responses to + // check existence, which seems excessive, plus initial request for + // container check, plus 2 ancestor checks only in the secure case. + logOpResponseCount("Checking file existence for non-existent file", base); + base = assertWebResponsesInRange(base, 1, 5); + + // Create an empty file + assertTrue(getFileSystem().createNewFile(filePath)); + base = getCurrentWebResponses(); + + // Check existence again + assertTrue(getFileSystem().exists(filePath)); + logOpResponseCount("Checking file existence for existent file", base); + base = assertWebResponsesInRange(base, 1, 4); + + // Delete the file + assertEquals(0, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_FILES_DELETED)); + assertTrue(getFileSystem().delete(filePath, false)); + // At the time of writing this code it takes 4 requests/responses to + // delete, which seems excessive. Check for range 1-4 for now. + logOpResponseCount("Deleting a file", base); + base = assertWebResponsesInRange(base, 1, 4); + assertEquals(1, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_FILES_DELETED)); + + assertNoErrors(); + } + + @Test + public void testMetricsOnDirRename() throws Exception { + long base = getBaseWebResponses(); + + Path originalDirName = new Path("/metricsTestDirectory_RenameStart"); + Path innerFileName = new Path(originalDirName, "innerFile"); + Path destDirName = new Path("/metricsTestDirectory_RenameFinal"); + + // Create an empty directory + assertTrue(getFileSystem().mkdirs(originalDirName)); + base = getCurrentWebResponses(); + + // Create an inner file + assertTrue(getFileSystem().createNewFile(innerFileName)); + base = getCurrentWebResponses(); + + // Rename the directory + assertTrue(getFileSystem().rename(originalDirName, destDirName)); + + // At the time of writing this code it takes 11 requests/responses + // to rename the directory with one file. Check for range 1-20 for now. + logOpResponseCount("Renaming a directory", base); + base = assertWebResponsesInRange(base, 1, 20); + + assertNoErrors(); + } + + /** + * Recursive discovery of path depth + * @param path path to measure. + * @return depth, where "/" == 0. + */ + int depth(Path path) { + if (path.isRoot()) { + return 0; + } else { + return 1 + depth(path.getParent()); + } + } + + @Test + public void testClientErrorMetrics() throws Exception { + String fileName = "metricsTestFile_ClientError"; + Path filePath = new Path("/"+fileName); + final int FILE_SIZE = 100; + OutputStream outputStream = null; + String leaseID = null; + try { + // Create a file + outputStream = getFileSystem().create(filePath); + leaseID = getTestAccount().acquireShortLease(fileName); + try { + outputStream.write(new byte[FILE_SIZE]); + outputStream.close(); + assertTrue("Should've thrown", false); + } catch (AzureException ex) { + assertTrue("Unexpected exception: " + ex, + ex.getMessage().contains("lease")); + } + assertEquals(1, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_CLIENT_ERRORS)); + assertEquals(0, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_SERVER_ERRORS)); + } finally { + if(leaseID != null){ + getTestAccount().releaseLease(leaseID, fileName); + } + IOUtils.closeStream(outputStream); + } + } + + private void logOpResponseCount(String opName, long base) { + LOG.info("{} took {} web responses to complete.", + opName, getCurrentWebResponses() - base); + } + + /** + * Gets (and asserts) the value of the wasb_web_responses counter just + * after the creation of the file system object. + */ + private long getBaseWebResponses() { + // The number of requests should start at 0 + return assertWebResponsesEquals(0, 0); + } + + /** + * Gets the current value of the wasb_web_responses counter. + */ + private long getCurrentWebResponses() { + return AzureMetricsTestUtil.getCurrentWebResponses(getInstrumentation()); + } + + /** + * Checks that the wasb_web_responses counter is at the given value. + * @param base The base value (before the operation of interest). + * @param expected The expected value for the operation of interest. + * @return The new base value now. + */ + private long assertWebResponsesEquals(long base, long expected) { + assertCounter(WASB_WEB_RESPONSES, base + expected, getMyMetrics()); + return base + expected; + } + + private void assertNoErrors() { + assertEquals(0, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_CLIENT_ERRORS)); + assertEquals(0, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_SERVER_ERRORS)); + } + + /** + * Checks that the wasb_web_responses counter is in the given range. + * @param base The base value (before the operation of interest). + * @param inclusiveLowerLimit The lower limit for what it should increase by. + * @param inclusiveUpperLimit The upper limit for what it should increase by. + * @return The new base value now. + */ + private long assertWebResponsesInRange(long base, + long inclusiveLowerLimit, + long inclusiveUpperLimit) { + long currentResponses = getCurrentWebResponses(); + long justOperation = currentResponses - base; + assertTrue(String.format( + "Web responses expected in range [%d, %d], but was %d.", + inclusiveLowerLimit, inclusiveUpperLimit, justOperation), + justOperation >= inclusiveLowerLimit && + justOperation <= inclusiveUpperLimit); + return currentResponses; + } + + /** + * Gets the metrics for the file system object. + * @return The metrics record. + */ + private MetricsRecordBuilder getMyMetrics() { + return getMetrics(getInstrumentation()); + } + + private AzureFileSystemInstrumentation getInstrumentation() { + return getFileSystem().getInstrumentation(); + } + + /** + * A matcher class for asserting that we got a tag with a given + * value. + */ + private static class TagMatcher extends TagExistsMatcher { + private final String tagValue; + + public TagMatcher(String tagName, String tagValue) { + super(tagName); + this.tagValue = tagValue; + } + + @Override + public boolean matches(MetricsTag toMatch) { + return toMatch.value().equals(tagValue); + } + + @Override + public void describeTo(Description desc) { + super.describeTo(desc); + desc.appendText(" with value " + tagValue); + } + } + + /** + * A matcher class for asserting that we got a tag with any value. + */ + private static class TagExistsMatcher extends BaseMatcher<MetricsTag> { + private final String tagName; + + public TagExistsMatcher(String tagName) { + this.tagName = tagName; + } + + @Override + public boolean matches(Object toMatch) { + MetricsTag asTag = (MetricsTag)toMatch; + return asTag.name().equals(tagName) && matches(asTag); + } + + protected boolean matches(MetricsTag toMatch) { + return true; + } + + @Override + public void describeTo(Description desc) { + desc.appendText("Has tag " + tagName); + } + } + + /** + * A matcher class for asserting that a long value is in a + * given range. + */ + private static class InRange extends BaseMatcher<Long> { + private final long inclusiveLowerLimit; + private final long inclusiveUpperLimit; + private long obtained; + + public InRange(long inclusiveLowerLimit, long inclusiveUpperLimit) { + this.inclusiveLowerLimit = inclusiveLowerLimit; + this.inclusiveUpperLimit = inclusiveUpperLimit; + } + + @Override + public boolean matches(Object number) { + obtained = (Long)number; + return obtained >= inclusiveLowerLimit && + obtained <= inclusiveUpperLimit; + } + + @Override + public void describeTo(Description description) { + description.appendText("Between " + inclusiveLowerLimit + + " and " + inclusiveUpperLimit + " inclusively"); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fbcf763/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/TestAzureFileSystemInstrumentation.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/TestAzureFileSystemInstrumentation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/TestAzureFileSystemInstrumentation.java deleted file mode 100644 index 2bf3839..0000000 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/TestAzureFileSystemInstrumentation.java +++ /dev/null @@ -1,579 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.azure.metrics; - -import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_CLIENT_ERRORS; -import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_DIRECTORIES_CREATED; -import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_DOWNLOAD_LATENCY; -import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_DOWNLOAD_RATE; -import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_FILES_CREATED; -import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_FILES_DELETED; -import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_SERVER_ERRORS; -import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_UPLOAD_LATENCY; -import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_UPLOAD_RATE; -import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_WEB_RESPONSES; -import static org.apache.hadoop.test.MetricsAsserts.assertCounter; -import static org.apache.hadoop.test.MetricsAsserts.getMetrics; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeNotNull; -import static org.mockito.Matchers.argThat; -import static org.mockito.Mockito.verify; - -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.Date; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount; -import org.apache.hadoop.fs.azure.AzureException; -import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore; -import org.apache.hadoop.fs.azure.NativeAzureFileSystem; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.metrics2.MetricsRecordBuilder; -import org.apache.hadoop.metrics2.MetricsTag; -import org.hamcrest.BaseMatcher; -import org.hamcrest.Description; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class TestAzureFileSystemInstrumentation { - private FileSystem fs; - private AzureBlobStorageTestAccount testAccount; - - @Before - public void setUp() throws Exception { - testAccount = AzureBlobStorageTestAccount.create(); - if (testAccount != null) { - fs = testAccount.getFileSystem(); - } - assumeNotNull(testAccount); - } - - @After - public void tearDown() throws Exception { - if (testAccount != null) { - testAccount.cleanup(); - testAccount = null; - fs = null; - } - } - - @Test - public void testMetricTags() throws Exception { - String accountName = - testAccount.getRealAccount().getBlobEndpoint() - .getAuthority(); - String containerName = - testAccount.getRealContainer().getName(); - MetricsRecordBuilder myMetrics = getMyMetrics(); - verify(myMetrics).add(argThat( - new TagMatcher("accountName", accountName) - )); - verify(myMetrics).add(argThat( - new TagMatcher("containerName", containerName) - )); - verify(myMetrics).add(argThat( - new TagMatcher("Context", "azureFileSystem") - )); - verify(myMetrics).add(argThat( - new TagExistsMatcher("wasbFileSystemId") - )); - } - - - @Test - public void testMetricsOnMkdirList() throws Exception { - long base = getBaseWebResponses(); - - // Create a directory - assertTrue(fs.mkdirs(new Path("a"))); - // At the time of writing - // getAncestor uses 2 calls for each folder level /user/<name>/a - // plus 1 call made by checkContainer - // mkdir checks the hierarchy with 2 calls per level - // mkdirs calls storeEmptyDir to create the empty folder, which makes 5 calls - // For a total of 7 + 6 + 5 = 18 web responses - base = assertWebResponsesInRange(base, 1, 18); - assertEquals(1, - AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_DIRECTORIES_CREATED)); - - // List the root contents - assertEquals(1, fs.listStatus(new Path("/")).length); - base = assertWebResponsesEquals(base, 1); - - assertNoErrors(); - } - - private BandwidthGaugeUpdater getBandwidthGaugeUpdater() { - NativeAzureFileSystem azureFs = (NativeAzureFileSystem)fs; - AzureNativeFileSystemStore azureStore = azureFs.getStore(); - return azureStore.getBandwidthGaugeUpdater(); - } - - private static byte[] nonZeroByteArray(int size) { - byte[] data = new byte[size]; - Arrays.fill(data, (byte)5); - return data; - } - - @Test - public void testMetricsOnFileCreateRead() throws Exception { - long base = getBaseWebResponses(); - - assertEquals(0, AzureMetricsTestUtil.getCurrentBytesWritten(getInstrumentation())); - - Path filePath = new Path("/metricsTest_webResponses"); - final int FILE_SIZE = 1000; - - // Suppress auto-update of bandwidth metrics so we get - // to update them exactly when we want to. - getBandwidthGaugeUpdater().suppressAutoUpdate(); - - // Create a file - Date start = new Date(); - OutputStream outputStream = fs.create(filePath); - outputStream.write(nonZeroByteArray(FILE_SIZE)); - outputStream.close(); - long uploadDurationMs = new Date().getTime() - start.getTime(); - - // The exact number of requests/responses that happen to create a file - // can vary - at the time of writing this code it takes 10 - // requests/responses for the 1000 byte file (33 for 100 MB), - // plus the initial container-check request but that - // can very easily change in the future. Just assert that we do roughly - // more than 2 but less than 15. - logOpResponseCount("Creating a 1K file", base); - base = assertWebResponsesInRange(base, 2, 15); - getBandwidthGaugeUpdater().triggerUpdate(true); - long bytesWritten = AzureMetricsTestUtil.getCurrentBytesWritten(getInstrumentation()); - assertTrue("The bytes written in the last second " + bytesWritten + - " is pretty far from the expected range of around " + FILE_SIZE + - " bytes plus a little overhead.", - bytesWritten > (FILE_SIZE / 2) && bytesWritten < (FILE_SIZE * 2)); - long totalBytesWritten = AzureMetricsTestUtil.getCurrentTotalBytesWritten(getInstrumentation()); - assertTrue("The total bytes written " + totalBytesWritten + - " is pretty far from the expected range of around " + FILE_SIZE + - " bytes plus a little overhead.", - totalBytesWritten >= FILE_SIZE && totalBytesWritten < (FILE_SIZE * 2)); - long uploadRate = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), WASB_UPLOAD_RATE); - System.out.println("Upload rate: " + uploadRate + " bytes/second."); - long expectedRate = (FILE_SIZE * 1000L) / uploadDurationMs; - assertTrue("The upload rate " + uploadRate + - " is below the expected range of around " + expectedRate + - " bytes/second that the unit test observed. This should never be" + - " the case since the test underestimates the rate by looking at " + - " end-to-end time instead of just block upload time.", - uploadRate >= expectedRate); - long uploadLatency = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), - WASB_UPLOAD_LATENCY); - System.out.println("Upload latency: " + uploadLatency); - long expectedLatency = uploadDurationMs; // We're uploading less than a block. - assertTrue("The upload latency " + uploadLatency + - " should be greater than zero now that I've just uploaded a file.", - uploadLatency > 0); - assertTrue("The upload latency " + uploadLatency + - " is more than the expected range of around " + expectedLatency + - " milliseconds that the unit test observed. This should never be" + - " the case since the test overestimates the latency by looking at " + - " end-to-end time instead of just block upload time.", - uploadLatency <= expectedLatency); - - // Read the file - start = new Date(); - InputStream inputStream = fs.open(filePath); - int count = 0; - while (inputStream.read() >= 0) { - count++; - } - inputStream.close(); - long downloadDurationMs = new Date().getTime() - start.getTime(); - assertEquals(FILE_SIZE, count); - - // Again, exact number varies. At the time of writing this code - // it takes 4 request/responses, so just assert a rough range between - // 1 and 10. - logOpResponseCount("Reading a 1K file", base); - base = assertWebResponsesInRange(base, 1, 10); - getBandwidthGaugeUpdater().triggerUpdate(false); - long totalBytesRead = AzureMetricsTestUtil.getCurrentTotalBytesRead(getInstrumentation()); - assertEquals(FILE_SIZE, totalBytesRead); - long bytesRead = AzureMetricsTestUtil.getCurrentBytesRead(getInstrumentation()); - assertTrue("The bytes read in the last second " + bytesRead + - " is pretty far from the expected range of around " + FILE_SIZE + - " bytes plus a little overhead.", - bytesRead > (FILE_SIZE / 2) && bytesRead < (FILE_SIZE * 2)); - long downloadRate = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), WASB_DOWNLOAD_RATE); - System.out.println("Download rate: " + downloadRate + " bytes/second."); - expectedRate = (FILE_SIZE * 1000L) / downloadDurationMs; - assertTrue("The download rate " + downloadRate + - " is below the expected range of around " + expectedRate + - " bytes/second that the unit test observed. This should never be" + - " the case since the test underestimates the rate by looking at " + - " end-to-end time instead of just block download time.", - downloadRate >= expectedRate); - long downloadLatency = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), - WASB_DOWNLOAD_LATENCY); - System.out.println("Download latency: " + downloadLatency); - expectedLatency = downloadDurationMs; // We're downloading less than a block. - assertTrue("The download latency " + downloadLatency + - " should be greater than zero now that I've just downloaded a file.", - downloadLatency > 0); - assertTrue("The download latency " + downloadLatency + - " is more than the expected range of around " + expectedLatency + - " milliseconds that the unit test observed. This should never be" + - " the case since the test overestimates the latency by looking at " + - " end-to-end time instead of just block download time.", - downloadLatency <= expectedLatency); - - assertNoErrors(); - } - - @Test - public void testMetricsOnBigFileCreateRead() throws Exception { - long base = getBaseWebResponses(); - - assertEquals(0, AzureMetricsTestUtil.getCurrentBytesWritten(getInstrumentation())); - - Path filePath = new Path("/metricsTest_webResponses"); - final int FILE_SIZE = 100 * 1024 * 1024; - - // Suppress auto-update of bandwidth metrics so we get - // to update them exactly when we want to. - getBandwidthGaugeUpdater().suppressAutoUpdate(); - - // Create a file - OutputStream outputStream = fs.create(filePath); - outputStream.write(new byte[FILE_SIZE]); - outputStream.close(); - - // The exact number of requests/responses that happen to create a file - // can vary - at the time of writing this code it takes 34 - // requests/responses for the 100 MB file, - // plus the initial container check request, but that - // can very easily change in the future. Just assert that we do roughly - // more than 20 but less than 50. - logOpResponseCount("Creating a 100 MB file", base); - base = assertWebResponsesInRange(base, 20, 50); - getBandwidthGaugeUpdater().triggerUpdate(true); - long totalBytesWritten = AzureMetricsTestUtil.getCurrentTotalBytesWritten(getInstrumentation()); - assertTrue("The total bytes written " + totalBytesWritten + - " is pretty far from the expected range of around " + FILE_SIZE + - " bytes plus a little overhead.", - totalBytesWritten >= FILE_SIZE && totalBytesWritten < (FILE_SIZE * 2)); - long uploadRate = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), WASB_UPLOAD_RATE); - System.out.println("Upload rate: " + uploadRate + " bytes/second."); - long uploadLatency = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), - WASB_UPLOAD_LATENCY); - System.out.println("Upload latency: " + uploadLatency); - assertTrue("The upload latency " + uploadLatency + - " should be greater than zero now that I've just uploaded a file.", - uploadLatency > 0); - - // Read the file - InputStream inputStream = fs.open(filePath); - int count = 0; - while (inputStream.read() >= 0) { - count++; - } - inputStream.close(); - assertEquals(FILE_SIZE, count); - - // Again, exact number varies. At the time of writing this code - // it takes 27 request/responses, so just assert a rough range between - // 20 and 40. - logOpResponseCount("Reading a 100 MB file", base); - base = assertWebResponsesInRange(base, 20, 40); - getBandwidthGaugeUpdater().triggerUpdate(false); - long totalBytesRead = AzureMetricsTestUtil.getCurrentTotalBytesRead(getInstrumentation()); - assertEquals(FILE_SIZE, totalBytesRead); - long downloadRate = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), WASB_DOWNLOAD_RATE); - System.out.println("Download rate: " + downloadRate + " bytes/second."); - long downloadLatency = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), - WASB_DOWNLOAD_LATENCY); - System.out.println("Download latency: " + downloadLatency); - assertTrue("The download latency " + downloadLatency + - " should be greater than zero now that I've just downloaded a file.", - downloadLatency > 0); - } - - @Test - public void testMetricsOnFileRename() throws Exception { - long base = getBaseWebResponses(); - - Path originalPath = new Path("/metricsTest_RenameStart"); - Path destinationPath = new Path("/metricsTest_RenameFinal"); - - // Create an empty file - assertEquals(0, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_FILES_CREATED)); - assertTrue(fs.createNewFile(originalPath)); - logOpResponseCount("Creating an empty file", base); - base = assertWebResponsesInRange(base, 2, 20); - assertEquals(1, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_FILES_CREATED)); - - // Rename the file - assertTrue(fs.rename(originalPath, destinationPath)); - // Varies: at the time of writing this code it takes 7 requests/responses. - logOpResponseCount("Renaming a file", base); - base = assertWebResponsesInRange(base, 2, 15); - - assertNoErrors(); - } - - @Test - public void testMetricsOnFileExistsDelete() throws Exception { - long base = getBaseWebResponses(); - - Path filePath = new Path("/metricsTest_delete"); - - // Check existence - assertFalse(fs.exists(filePath)); - // At the time of writing this code it takes 2 requests/responses to - // check existence, which seems excessive, plus initial request for - // container check, plus 2 ancestor checks only in the secure case. - logOpResponseCount("Checking file existence for non-existent file", base); - base = assertWebResponsesInRange(base, 1, 5); - - // Create an empty file - assertTrue(fs.createNewFile(filePath)); - base = getCurrentWebResponses(); - - // Check existence again - assertTrue(fs.exists(filePath)); - logOpResponseCount("Checking file existence for existent file", base); - base = assertWebResponsesInRange(base, 1, 4); - - // Delete the file - assertEquals(0, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_FILES_DELETED)); - assertTrue(fs.delete(filePath, false)); - // At the time of writing this code it takes 4 requests/responses to - // delete, which seems excessive. Check for range 1-4 for now. - logOpResponseCount("Deleting a file", base); - base = assertWebResponsesInRange(base, 1, 4); - assertEquals(1, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_FILES_DELETED)); - - assertNoErrors(); - } - - @Test - public void testMetricsOnDirRename() throws Exception { - long base = getBaseWebResponses(); - - Path originalDirName = new Path("/metricsTestDirectory_RenameStart"); - Path innerFileName = new Path(originalDirName, "innerFile"); - Path destDirName = new Path("/metricsTestDirectory_RenameFinal"); - - // Create an empty directory - assertTrue(fs.mkdirs(originalDirName)); - base = getCurrentWebResponses(); - - // Create an inner file - assertTrue(fs.createNewFile(innerFileName)); - base = getCurrentWebResponses(); - - // Rename the directory - assertTrue(fs.rename(originalDirName, destDirName)); - // At the time of writing this code it takes 11 requests/responses - // to rename the directory with one file. Check for range 1-20 for now. - logOpResponseCount("Renaming a directory", base); - base = assertWebResponsesInRange(base, 1, 20); - - assertNoErrors(); - } - - @Test - public void testClientErrorMetrics() throws Exception { - String fileName = "metricsTestFile_ClientError"; - Path filePath = new Path("/"+fileName); - final int FILE_SIZE = 100; - OutputStream outputStream = null; - String leaseID = null; - try { - // Create a file - outputStream = fs.create(filePath); - leaseID = testAccount.acquireShortLease(fileName); - try { - outputStream.write(new byte[FILE_SIZE]); - outputStream.close(); - assertTrue("Should've thrown", false); - } catch (AzureException ex) { - assertTrue("Unexpected exception: " + ex, - ex.getMessage().contains("lease")); - } - assertEquals(1, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_CLIENT_ERRORS)); - assertEquals(0, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_SERVER_ERRORS)); - } finally { - if(leaseID != null){ - testAccount.releaseLease(leaseID, fileName); - } - IOUtils.closeStream(outputStream); - } - } - - private void logOpResponseCount(String opName, long base) { - System.out.println(opName + " took " + (getCurrentWebResponses() - base) + - " web responses to complete."); - } - - /** - * Gets (and asserts) the value of the wasb_web_responses counter just - * after the creation of the file system object. - */ - private long getBaseWebResponses() { - // The number of requests should start at 0 - return assertWebResponsesEquals(0, 0); - } - - /** - * Gets the current value of the wasb_web_responses counter. - */ - private long getCurrentWebResponses() { - return AzureMetricsTestUtil.getCurrentWebResponses(getInstrumentation()); - } - - /** - * Checks that the wasb_web_responses counter is at the given value. - * @param base The base value (before the operation of interest). - * @param expected The expected value for the operation of interest. - * @return The new base value now. - */ - private long assertWebResponsesEquals(long base, long expected) { - assertCounter(WASB_WEB_RESPONSES, base + expected, getMyMetrics()); - return base + expected; - } - - private void assertNoErrors() { - assertEquals(0, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_CLIENT_ERRORS)); - assertEquals(0, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_SERVER_ERRORS)); - } - - /** - * Checks that the wasb_web_responses counter is in the given range. - * @param base The base value (before the operation of interest). - * @param inclusiveLowerLimit The lower limit for what it should increase by. - * @param inclusiveUpperLimit The upper limit for what it should increase by. - * @return The new base value now. - */ - private long assertWebResponsesInRange(long base, - long inclusiveLowerLimit, - long inclusiveUpperLimit) { - long currentResponses = getCurrentWebResponses(); - long justOperation = currentResponses - base; - assertTrue(String.format( - "Web responses expected in range [%d, %d], but was %d.", - inclusiveLowerLimit, inclusiveUpperLimit, justOperation), - justOperation >= inclusiveLowerLimit && - justOperation <= inclusiveUpperLimit); - return currentResponses; - } - - /** - * Gets the metrics for the file system object. - * @return The metrics record. - */ - private MetricsRecordBuilder getMyMetrics() { - return getMetrics(getInstrumentation()); - } - - private AzureFileSystemInstrumentation getInstrumentation() { - return ((NativeAzureFileSystem)fs).getInstrumentation(); - } - - /** - * A matcher class for asserting that we got a tag with a given - * value. - */ - private static class TagMatcher extends TagExistsMatcher { - private final String tagValue; - - public TagMatcher(String tagName, String tagValue) { - super(tagName); - this.tagValue = tagValue; - } - - @Override - public boolean matches(MetricsTag toMatch) { - return toMatch.value().equals(tagValue); - } - - @Override - public void describeTo(Description desc) { - super.describeTo(desc); - desc.appendText(" with value " + tagValue); - } - } - - /** - * A matcher class for asserting that we got a tag with any value. - */ - private static class TagExistsMatcher extends BaseMatcher<MetricsTag> { - private final String tagName; - - public TagExistsMatcher(String tagName) { - this.tagName = tagName; - } - - @Override - public boolean matches(Object toMatch) { - MetricsTag asTag = (MetricsTag)toMatch; - return asTag.name().equals(tagName) && matches(asTag); - } - - protected boolean matches(MetricsTag toMatch) { - return true; - } - - @Override - public void describeTo(Description desc) { - desc.appendText("Has tag " + tagName); - } - } - - /** - * A matcher class for asserting that a long value is in a - * given range. - */ - private static class InRange extends BaseMatcher<Long> { - private final long inclusiveLowerLimit; - private final long inclusiveUpperLimit; - private long obtained; - - public InRange(long inclusiveLowerLimit, long inclusiveUpperLimit) { - this.inclusiveLowerLimit = inclusiveLowerLimit; - this.inclusiveUpperLimit = inclusiveUpperLimit; - } - - @Override - public boolean matches(Object number) { - obtained = (Long)number; - return obtained >= inclusiveLowerLimit && - obtained <= inclusiveUpperLimit; - } - - @Override - public void describeTo(Description description) { - description.appendText("Between " + inclusiveLowerLimit + - " and " + inclusiveUpperLimit + " inclusively"); - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
