http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java index 177477c..726b504 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java @@ -18,12 +18,6 @@ package org.apache.hadoop.fs.azure; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.FileNotFoundException; @@ -47,16 +41,18 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; import org.junit.Test; -import org.apache.hadoop.fs.azure.AzureException; import org.apache.hadoop.fs.azure.NativeAzureFileSystem.FolderRenamePending; import com.microsoft.azure.storage.AccessCondition; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.CloudBlob; +import static org.apache.hadoop.test.GenericTestUtils.*; + /* * Tests the Native Azure file system (WASB) against an actual blob store if * provided in the environment. @@ -71,15 +67,46 @@ public abstract class NativeAzureFileSystemBaseTest private final long modifiedTimeErrorMargin = 5 * 1000; // Give it +/-5 seconds public static final Log LOG = LogFactory.getLog(NativeAzureFileSystemBaseTest.class); + protected NativeAzureFileSystem fs; + + @Override + public void setUp() throws Exception { + super.setUp(); + fs = getFileSystem(); + } + + /** + * Assert that a path does not exist. + * + * @param message message to include in the assertion failure message + * @param path path in the filesystem + * @throws IOException IO problems + */ + public void assertPathDoesNotExist(String message, + Path path) throws IOException { + ContractTestUtils.assertPathDoesNotExist(fs, message, path); + } + + /** + * Assert that a path exists. + * + * @param message message to include in the assertion failure message + * @param path path in the filesystem + * @throws IOException IO problems + */ + public void assertPathExists(String message, + Path path) throws IOException { + ContractTestUtils.assertPathExists(fs, message, path); + } @Test public void testCheckingNonExistentOneLetterFile() throws Exception { - assertFalse(fs.exists(new Path("/a"))); + assertPathDoesNotExist("one letter file", new Path("/a")); } @Test public void testStoreRetrieveFile() throws Exception { - Path testFile = new Path("unit-test-file"); + Path testFile = methodPath(); writeString(testFile, "Testing"); assertTrue(fs.exists(testFile)); FileStatus status = fs.getFileStatus(testFile); @@ -93,7 +120,7 @@ public abstract class NativeAzureFileSystemBaseTest @Test public void testStoreDeleteFolder() throws Exception { - Path testFolder = new Path("storeDeleteFolder"); + Path testFolder = methodPath(); assertFalse(fs.exists(testFolder)); assertTrue(fs.mkdirs(testFolder)); assertTrue(fs.exists(testFolder)); @@ -105,22 +132,22 @@ public abstract class NativeAzureFileSystemBaseTest assertEquals(new FsPermission((short) 0755), status.getPermission()); Path innerFile = new Path(testFolder, "innerFile"); assertTrue(fs.createNewFile(innerFile)); - assertTrue(fs.exists(innerFile)); + assertPathExists("inner file", innerFile); assertTrue(fs.delete(testFolder, true)); - assertFalse(fs.exists(innerFile)); - assertFalse(fs.exists(testFolder)); + assertPathDoesNotExist("inner file", innerFile); + assertPathDoesNotExist("testFolder", testFolder); } @Test public void testFileOwnership() throws Exception { - Path testFile = new Path("ownershipTestFile"); + Path testFile = methodPath(); writeString(testFile, "Testing"); testOwnership(testFile); } @Test public void testFolderOwnership() throws Exception { - Path testFolder = new Path("ownershipTestFolder"); + Path testFolder = methodPath(); fs.mkdirs(testFolder); testOwnership(testFolder); } @@ -147,7 +174,7 @@ public abstract class NativeAzureFileSystemBaseTest @Test public void testFilePermissions() throws Exception { - Path testFile = new Path("permissionTestFile"); + Path testFile = methodPath(); FsPermission permission = FsPermission.createImmutable((short) 644); createEmptyFile(testFile, permission); FileStatus ret = fs.getFileStatus(testFile); @@ -157,7 +184,7 @@ public abstract class NativeAzureFileSystemBaseTest @Test public void testFolderPermissions() throws Exception { - Path testFolder = new Path("permissionTestFolder"); + Path testFolder = methodPath(); FsPermission permission = FsPermission.createImmutable((short) 644); fs.mkdirs(testFolder, permission); FileStatus ret = fs.getFileStatus(testFolder); @@ -176,9 +203,9 @@ public abstract class NativeAzureFileSystemBaseTest createEmptyFile(testFile, permission); FsPermission rootPerm = fs.getFileStatus(firstDir.getParent()).getPermission(); FsPermission inheritPerm = FsPermission.createImmutable((short)(rootPerm.toShort() | 0300)); - assertTrue(fs.exists(testFile)); - assertTrue(fs.exists(firstDir)); - assertTrue(fs.exists(middleDir)); + assertPathExists("test file", testFile); + assertPathExists("firstDir", firstDir); + assertPathExists("middleDir", middleDir); // verify that the indirectly created directory inherited its permissions from the root directory FileStatus directoryStatus = fs.getFileStatus(middleDir); assertTrue(directoryStatus.isDirectory()); @@ -188,7 +215,7 @@ public abstract class NativeAzureFileSystemBaseTest assertFalse(fileStatus.isDirectory()); assertEqualsIgnoreStickyBit(umaskedPermission, fileStatus.getPermission()); assertTrue(fs.delete(firstDir, true)); - assertFalse(fs.exists(testFile)); + assertPathDoesNotExist("deleted file", testFile); // An alternative test scenario would've been to delete the file first, // and then check for the existence of the upper folders still. But that @@ -264,7 +291,7 @@ public abstract class NativeAzureFileSystemBaseTest assertTrue(fs.delete(new Path("deep"), true)); } - private static enum RenameFolderVariation { + private enum RenameFolderVariation { CreateFolderAndInnerFile, CreateJustInnerFile, CreateJustFolder } @@ -303,10 +330,10 @@ public abstract class NativeAzureFileSystemBaseTest localFs.delete(localFilePath, true); try { writeString(localFs, localFilePath, "Testing"); - Path dstPath = new Path("copiedFromLocal"); + Path dstPath = methodPath(); assertTrue(FileUtil.copy(localFs, localFilePath, fs, dstPath, false, fs.getConf())); - assertTrue(fs.exists(dstPath)); + assertPathExists("coied from local", dstPath); assertEquals("Testing", readString(fs, dstPath)); fs.delete(dstPath, true); } finally { @@ -423,32 +450,32 @@ public abstract class NativeAzureFileSystemBaseTest @Test public void testReadingDirectoryAsFile() throws Exception { - Path dir = new Path("/x"); + Path dir = methodPath(); assertTrue(fs.mkdirs(dir)); try { fs.open(dir).close(); assertTrue("Should've thrown", false); } catch (FileNotFoundException ex) { - assertEquals("/x is a directory not a file.", ex.getMessage()); + assertExceptionContains("a directory not a file.", ex); } } @Test public void testCreatingFileOverDirectory() throws Exception { - Path dir = new Path("/x"); + Path dir = methodPath(); assertTrue(fs.mkdirs(dir)); try { fs.create(dir).close(); assertTrue("Should've thrown", false); } catch (IOException ex) { - assertEquals("Cannot create file /x; already exists as a directory.", - ex.getMessage()); + assertExceptionContains("Cannot create file", ex); + assertExceptionContains("already exists as a directory", ex); } } @Test public void testInputStreamReadWithZeroSizeBuffer() throws Exception { - Path newFile = new Path("zeroSizeRead"); + Path newFile = methodPath(); OutputStream output = fs.create(newFile); output.write(10); output.close(); @@ -460,7 +487,7 @@ public abstract class NativeAzureFileSystemBaseTest @Test public void testInputStreamReadWithBufferReturnsMinusOneOnEof() throws Exception { - Path newFile = new Path("eofRead"); + Path newFile = methodPath(); OutputStream output = fs.create(newFile); output.write(10); output.close(); @@ -482,7 +509,7 @@ public abstract class NativeAzureFileSystemBaseTest @Test public void testInputStreamReadWithBufferReturnsMinusOneOnEofForLargeBuffer() throws Exception { - Path newFile = new Path("eofRead2"); + Path newFile = methodPath(); OutputStream output = fs.create(newFile); byte[] outputBuff = new byte[97331]; for(int i = 0; i < outputBuff.length; ++i) { @@ -508,7 +535,7 @@ public abstract class NativeAzureFileSystemBaseTest @Test public void testInputStreamReadIntReturnsMinusOneOnEof() throws Exception { - Path newFile = new Path("eofRead3"); + Path newFile = methodPath(); OutputStream output = fs.create(newFile); output.write(10); output.close(); @@ -525,7 +552,7 @@ public abstract class NativeAzureFileSystemBaseTest @Test public void testSetPermissionOnFile() throws Exception { - Path newFile = new Path("testPermission"); + Path newFile = methodPath(); OutputStream output = fs.create(newFile); output.write(13); output.close(); @@ -540,14 +567,14 @@ public abstract class NativeAzureFileSystemBaseTest // Don't check the file length for page blobs. Only block blobs // provide the actual length of bytes written. - if (!(this instanceof TestNativeAzureFSPageBlobLive)) { + if (!(this instanceof ITestNativeAzureFSPageBlobLive)) { assertEquals(1, newStatus.getLen()); } } @Test public void testSetPermissionOnFolder() throws Exception { - Path newFolder = new Path("testPermission"); + Path newFolder = methodPath(); assertTrue(fs.mkdirs(newFolder)); FsPermission newPermission = new FsPermission((short) 0600); fs.setPermission(newFolder, newPermission); @@ -559,7 +586,7 @@ public abstract class NativeAzureFileSystemBaseTest @Test public void testSetOwnerOnFile() throws Exception { - Path newFile = new Path("testOwner"); + Path newFile = methodPath(); OutputStream output = fs.create(newFile); output.write(13); output.close(); @@ -571,7 +598,7 @@ public abstract class NativeAzureFileSystemBaseTest // File length is only reported to be the size of bytes written to the file for block blobs. // So only check it for block blobs, not page blobs. - if (!(this instanceof TestNativeAzureFSPageBlobLive)) { + if (!(this instanceof ITestNativeAzureFSPageBlobLive)) { assertEquals(1, newStatus.getLen()); } fs.setOwner(newFile, null, "newGroup"); @@ -583,7 +610,7 @@ public abstract class NativeAzureFileSystemBaseTest @Test public void testSetOwnerOnFolder() throws Exception { - Path newFolder = new Path("testOwner"); + Path newFolder = methodPath(); assertTrue(fs.mkdirs(newFolder)); fs.setOwner(newFolder, "newUser", null); FileStatus newStatus = fs.getFileStatus(newFolder); @@ -594,21 +621,21 @@ public abstract class NativeAzureFileSystemBaseTest @Test public void testModifiedTimeForFile() throws Exception { - Path testFile = new Path("testFile"); + Path testFile = methodPath(); fs.create(testFile).close(); testModifiedTime(testFile); } @Test public void testModifiedTimeForFolder() throws Exception { - Path testFolder = new Path("testFolder"); + Path testFolder = methodPath(); assertTrue(fs.mkdirs(testFolder)); testModifiedTime(testFolder); } @Test public void testFolderLastModifiedTime() throws Exception { - Path parentFolder = new Path("testFolder"); + Path parentFolder = methodPath(); Path innerFile = new Path(parentFolder, "innerfile"); assertTrue(fs.mkdirs(parentFolder)); @@ -983,7 +1010,7 @@ public abstract class NativeAzureFileSystemBaseTest // Make sure rename pending file is gone. FileStatus[] listed = fs.listStatus(new Path("/")); - assertEquals(1, listed.length); + assertEquals("Pending directory still found", 1, listed.length); assertTrue(listed[0].isDirectory()); } @@ -1681,7 +1708,7 @@ public abstract class NativeAzureFileSystemBaseTest assertTrue("Unanticipated exception", false); } } else { - assertTrue("Unknown thread name", false); + fail("Unknown thread name"); } LOG.info(name + " is exiting.");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/RunningLiveWasbTests.txt ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/RunningLiveWasbTests.txt b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/RunningLiveWasbTests.txt deleted file mode 100644 index 54ba4d8..0000000 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/RunningLiveWasbTests.txt +++ /dev/null @@ -1,22 +0,0 @@ -======================================================================== -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -========================================================================= - -In order to run Windows Azure Storage Blob (WASB) unit tests against a live -Azure Storage account, you need to provide test account details in a configuration -file called azure-test.xml. See hadoop-tools/hadoop-azure/README.txt for details -on configuration, and how to run the tests. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java deleted file mode 100644 index a10a366..0000000 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java +++ /dev/null @@ -1,195 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.azure; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; -import static org.junit.Assume.assumeNotNull; - -import java.io.*; -import java.util.Arrays; - -import org.apache.hadoop.fs.azure.AzureException; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.fs.permission.PermissionStatus; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class TestAzureConcurrentOutOfBandIo { - - // Class constants. - static final int DOWNLOAD_BLOCK_SIZE = 8 * 1024 * 1024; - static final int UPLOAD_BLOCK_SIZE = 4 * 1024 * 1024; - static final int BLOB_SIZE = 32 * 1024 * 1024; - - // Number of blocks to be written before flush. - static final int NUMBER_OF_BLOCKS = 2; - - protected AzureBlobStorageTestAccount testAccount; - - // Overridden TestCase methods. - @Before - public void setUp() throws Exception { - testAccount = AzureBlobStorageTestAccount.createOutOfBandStore( - UPLOAD_BLOCK_SIZE, DOWNLOAD_BLOCK_SIZE); - assumeNotNull(testAccount); - } - - @After - public void tearDown() throws Exception { - if (testAccount != null) { - testAccount.cleanup(); - testAccount = null; - } - } - - class DataBlockWriter implements Runnable { - - Thread runner; - AzureBlobStorageTestAccount writerStorageAccount; - String key; - boolean done = false; - - /** - * Constructor captures the test account. - * - * @param testAccount - */ - public DataBlockWriter(AzureBlobStorageTestAccount testAccount, String key) { - writerStorageAccount = testAccount; - this.key = key; - } - - /** - * Start writing blocks to Azure storage. - */ - public void startWriting() { - runner = new Thread(this); // Create the block writer thread. - runner.start(); // Start the block writer thread. - } - - /** - * Stop writing blocks to Azure storage. - */ - public void stopWriting() { - done = true; - } - - /** - * Implementation of the runnable interface. The run method is a tight loop - * which repeatedly updates the blob with a 4 MB block. - */ - public void run() { - byte[] dataBlockWrite = new byte[UPLOAD_BLOCK_SIZE]; - - OutputStream outputStream = null; - - try { - for (int i = 0; !done; i++) { - // Write two 4 MB blocks to the blob. - // - outputStream = writerStorageAccount.getStore().storefile( - key, - new PermissionStatus("", "", FsPermission.getDefault()), - key); - - Arrays.fill(dataBlockWrite, (byte) (i % 256)); - for (int j = 0; j < NUMBER_OF_BLOCKS; j++) { - outputStream.write(dataBlockWrite); - } - - outputStream.flush(); - outputStream.close(); - } - } catch (AzureException e) { - System.out - .println("DatablockWriter thread encountered a storage exception." - + e.getMessage()); - } catch (IOException e) { - System.out - .println("DatablockWriter thread encountered an I/O exception." - + e.getMessage()); - } - } - } - - @Test - public void testReadOOBWrites() throws Exception { - - byte[] dataBlockWrite = new byte[UPLOAD_BLOCK_SIZE]; - byte[] dataBlockRead = new byte[UPLOAD_BLOCK_SIZE]; - - // Write to blob to make sure it exists. - // - // Write five 4 MB blocks to the blob. To ensure there is data in the blob before - // reading. This eliminates the race between the reader and writer threads. - OutputStream outputStream = testAccount.getStore().storefile( - "WASB_String.txt", - new PermissionStatus("", "", FsPermission.getDefault()), - "WASB_String.txt"); - Arrays.fill(dataBlockWrite, (byte) 255); - for (int i = 0; i < NUMBER_OF_BLOCKS; i++) { - outputStream.write(dataBlockWrite); - } - - outputStream.flush(); - outputStream.close(); - - // Start writing blocks to Azure store using the DataBlockWriter thread. - DataBlockWriter writeBlockTask = new DataBlockWriter(testAccount, - "WASB_String.txt"); - writeBlockTask.startWriting(); - int count = 0; - InputStream inputStream = null; - - for (int i = 0; i < 5; i++) { - try { - inputStream = testAccount.getStore().retrieve("WASB_String.txt"); - count = 0; - int c = 0; - - while (c >= 0) { - c = inputStream.read(dataBlockRead, 0, UPLOAD_BLOCK_SIZE); - if (c < 0) { - break; - } - - // Counting the number of bytes. - count += c; - } - } catch (IOException e) { - System.out.println(e.getCause().toString()); - e.printStackTrace(); - fail(); - } - - // Close the stream. - if (null != inputStream){ - inputStream.close(); - } - } - - // Stop writing blocks. - writeBlockTask.stopWriting(); - - // Validate that a block was read. - assertEquals(NUMBER_OF_BLOCKS * UPLOAD_BLOCK_SIZE, count); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIoWithSecureMode.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIoWithSecureMode.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIoWithSecureMode.java deleted file mode 100644 index 687b785..0000000 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIoWithSecureMode.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.azure; - -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.fs.permission.PermissionStatus; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.DataInputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.Arrays; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; -import static org.junit.Assume.assumeNotNull; - -/** - * Extends TestAzureConcurrentOutOfBandIo in order to run testReadOOBWrites with secure mode - * (fs.azure.secure.mode) both enabled and disabled. - */ -public class TestAzureConcurrentOutOfBandIoWithSecureMode extends TestAzureConcurrentOutOfBandIo { - - // Overridden TestCase methods. - @Before - @Override - public void setUp() throws Exception { - testAccount = AzureBlobStorageTestAccount.createOutOfBandStore( - UPLOAD_BLOCK_SIZE, DOWNLOAD_BLOCK_SIZE, true); - assumeNotNull(testAccount); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java deleted file mode 100644 index c985224..0000000 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java +++ /dev/null @@ -1,244 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.azure; - -import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.NO_ACCESS_TO_CONTAINER_MSG; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeNotNull; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.HttpURLConnection; -import java.net.URI; -import java.util.Arrays; -import java.util.HashMap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.TestHookOperationContext; -import org.apache.hadoop.test.GenericTestUtils; - -import org.junit.Test; - -import com.microsoft.azure.storage.OperationContext; -import com.microsoft.azure.storage.SendingRequestEvent; -import com.microsoft.azure.storage.StorageEvent; - -public class TestAzureFileSystemErrorConditions { - private static final int ALL_THREE_FILE_SIZE = 1024; - - @Test - public void testNoInitialize() throws Exception { - AzureNativeFileSystemStore store = new AzureNativeFileSystemStore(); - boolean passed = false; - try { - store.retrieveMetadata("foo"); - passed = true; - } catch (AssertionError e) { - } - assertFalse( - "Doing an operation on the store should throw if not initalized.", - passed); - } - - /** - * Try accessing an unauthorized or non-existent (treated the same) container - * from WASB. - */ - @Test - public void testAccessUnauthorizedPublicContainer() throws Exception { - final String container = "nonExistentContainer"; - final String account = "hopefullyNonExistentAccount"; - Path noAccessPath = new Path( - "wasb://" + container + "@" + account + "/someFile"); - NativeAzureFileSystem.suppressRetryPolicy(); - try { - FileSystem.get(noAccessPath.toUri(), new Configuration()) - .open(noAccessPath); - assertTrue("Should've thrown.", false); - } catch (AzureException ex) { - GenericTestUtils.assertExceptionContains( - String.format(NO_ACCESS_TO_CONTAINER_MSG, account, container), ex); - } finally { - NativeAzureFileSystem.resumeRetryPolicy(); - } - } - - @Test - public void testAccessContainerWithWrongVersion() throws Exception { - AzureNativeFileSystemStore store = new AzureNativeFileSystemStore(); - MockStorageInterface mockStorage = new MockStorageInterface(); - store.setAzureStorageInteractionLayer(mockStorage); - FileSystem fs = new NativeAzureFileSystem(store); - try { - Configuration conf = new Configuration(); - AzureBlobStorageTestAccount.setMockAccountKey(conf); - HashMap<String, String> metadata = new HashMap<String, String>(); - metadata.put(AzureNativeFileSystemStore.VERSION_METADATA_KEY, - "2090-04-05"); // It's from the future! - mockStorage.addPreExistingContainer( - AzureBlobStorageTestAccount.getMockContainerUri(), metadata); - - boolean passed = false; - try { - fs.initialize(new URI(AzureBlobStorageTestAccount.MOCK_WASB_URI), conf); - fs.listStatus(new Path("/")); - passed = true; - } catch (AzureException ex) { - assertTrue("Unexpected exception message: " + ex, - ex.getMessage().contains("unsupported version: 2090-04-05.")); - } - assertFalse("Should've thrown an exception because of the wrong version.", - passed); - } finally { - fs.close(); - } - } - - private interface ConnectionRecognizer { - boolean isTargetConnection(HttpURLConnection connection); - } - - private class TransientErrorInjector extends StorageEvent<SendingRequestEvent> { - final ConnectionRecognizer connectionRecognizer; - private boolean injectedErrorOnce = false; - - public TransientErrorInjector(ConnectionRecognizer connectionRecognizer) { - this.connectionRecognizer = connectionRecognizer; - } - - @Override - public void eventOccurred(SendingRequestEvent eventArg) { - HttpURLConnection connection = (HttpURLConnection)eventArg.getConnectionObject(); - if (!connectionRecognizer.isTargetConnection(connection)) { - return; - } - if (!injectedErrorOnce) { - connection.setReadTimeout(1); - connection.disconnect(); - injectedErrorOnce = true; - } - } - } - - private void injectTransientError(NativeAzureFileSystem fs, - final ConnectionRecognizer connectionRecognizer) { - fs.getStore().addTestHookToOperationContext(new TestHookOperationContext() { - @Override - public OperationContext modifyOperationContext(OperationContext original) { - original.getSendingRequestEventHandler().addListener( - new TransientErrorInjector(connectionRecognizer)); - return original; - } - }); - } - - @Test - public void testTransientErrorOnDelete() throws Exception { - // Need to do this test against a live storage account - AzureBlobStorageTestAccount testAccount = - AzureBlobStorageTestAccount.create(); - assumeNotNull(testAccount); - try { - NativeAzureFileSystem fs = testAccount.getFileSystem(); - injectTransientError(fs, new ConnectionRecognizer() { - @Override - public boolean isTargetConnection(HttpURLConnection connection) { - return connection.getRequestMethod().equals("DELETE"); - } - }); - Path testFile = new Path("/a/b"); - assertTrue(fs.createNewFile(testFile)); - assertTrue(fs.rename(testFile, new Path("/x"))); - } finally { - testAccount.cleanup(); - } - } - - private void writeAllThreeFile(NativeAzureFileSystem fs, Path testFile) - throws IOException { - byte[] buffer = new byte[ALL_THREE_FILE_SIZE]; - Arrays.fill(buffer, (byte)3); - OutputStream stream = fs.create(testFile); - stream.write(buffer); - stream.close(); - } - - private void readAllThreeFile(NativeAzureFileSystem fs, Path testFile) - throws IOException { - byte[] buffer = new byte[ALL_THREE_FILE_SIZE]; - InputStream inStream = fs.open(testFile); - assertEquals(buffer.length, - inStream.read(buffer, 0, buffer.length)); - inStream.close(); - for (int i = 0; i < buffer.length; i++) { - assertEquals(3, buffer[i]); - } - } - - @Test - public void testTransientErrorOnCommitBlockList() throws Exception { - // Need to do this test against a live storage account - AzureBlobStorageTestAccount testAccount = - AzureBlobStorageTestAccount.create(); - assumeNotNull(testAccount); - try { - NativeAzureFileSystem fs = testAccount.getFileSystem(); - injectTransientError(fs, new ConnectionRecognizer() { - @Override - public boolean isTargetConnection(HttpURLConnection connection) { - return connection.getRequestMethod().equals("PUT") - && connection.getURL().getQuery() != null - && connection.getURL().getQuery().contains("blocklist"); - } - }); - Path testFile = new Path("/a/b"); - writeAllThreeFile(fs, testFile); - readAllThreeFile(fs, testFile); - } finally { - testAccount.cleanup(); - } - } - - @Test - public void testTransientErrorOnRead() throws Exception { - // Need to do this test against a live storage account - AzureBlobStorageTestAccount testAccount = - AzureBlobStorageTestAccount.create(); - assumeNotNull(testAccount); - try { - NativeAzureFileSystem fs = testAccount.getFileSystem(); - Path testFile = new Path("/a/b"); - writeAllThreeFile(fs, testFile); - injectTransientError(fs, new ConnectionRecognizer() { - @Override - public boolean isTargetConnection(HttpURLConnection connection) { - return connection.getRequestMethod().equals("GET"); - } - }); - readAllThreeFile(fs, testFile); - } finally { - testAccount.cleanup(); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobDataValidation.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobDataValidation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobDataValidation.java deleted file mode 100644 index ea17b62..0000000 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobDataValidation.java +++ /dev/null @@ -1,237 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.azure; - -import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_CHECK_BLOCK_MD5; -import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_STORE_BLOB_MD5; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; -import static org.junit.Assume.assumeNotNull; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.HttpURLConnection; -import java.util.Arrays; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.TestHookOperationContext; -import org.junit.After; -import org.junit.Test; - -import com.microsoft.azure.storage.Constants; -import com.microsoft.azure.storage.OperationContext; -import com.microsoft.azure.storage.ResponseReceivedEvent; -import com.microsoft.azure.storage.StorageErrorCodeStrings; -import com.microsoft.azure.storage.StorageEvent; -import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.blob.BlockEntry; -import com.microsoft.azure.storage.blob.BlockSearchMode; -import com.microsoft.azure.storage.blob.CloudBlockBlob; -import com.microsoft.azure.storage.core.Base64; - -/** - * Test that we do proper data integrity validation with MD5 checks as - * configured. - */ -public class TestBlobDataValidation { - private AzureBlobStorageTestAccount testAccount; - - @After - public void tearDown() throws Exception { - if (testAccount != null) { - testAccount.cleanup(); - testAccount = null; - } - } - - /** - * Test that by default we don't store the blob-level MD5. - */ - @Test - public void testBlobMd5StoreOffByDefault() throws Exception { - testAccount = AzureBlobStorageTestAccount.create(); - testStoreBlobMd5(false); - } - - /** - * Test that we get blob-level MD5 storage and validation if we specify that - * in the configuration. - */ - @Test - public void testStoreBlobMd5() throws Exception { - Configuration conf = new Configuration(); - conf.setBoolean(KEY_STORE_BLOB_MD5, true); - testAccount = AzureBlobStorageTestAccount.create(conf); - testStoreBlobMd5(true); - } - - private void testStoreBlobMd5(boolean expectMd5Stored) throws Exception { - assumeNotNull(testAccount); - // Write a test file. - String testFileKey = "testFile"; - Path testFilePath = new Path("/" + testFileKey); - OutputStream outStream = testAccount.getFileSystem().create(testFilePath); - outStream.write(new byte[] { 5, 15 }); - outStream.close(); - - // Check that we stored/didn't store the MD5 field as configured. - CloudBlockBlob blob = testAccount.getBlobReference(testFileKey); - blob.downloadAttributes(); - String obtainedMd5 = blob.getProperties().getContentMD5(); - if (expectMd5Stored) { - assertNotNull(obtainedMd5); - } else { - assertNull("Expected no MD5, found: " + obtainedMd5, obtainedMd5); - } - - // Mess with the content so it doesn't match the MD5. - String newBlockId = Base64.encode(new byte[] { 55, 44, 33, 22 }); - blob.uploadBlock(newBlockId, - new ByteArrayInputStream(new byte[] { 6, 45 }), 2); - blob.commitBlockList(Arrays.asList(new BlockEntry[] { new BlockEntry( - newBlockId, BlockSearchMode.UNCOMMITTED) })); - - // Now read back the content. If we stored the MD5 for the blob content - // we should get a data corruption error. - InputStream inStream = testAccount.getFileSystem().open(testFilePath); - try { - byte[] inBuf = new byte[100]; - while (inStream.read(inBuf) > 0){ - //nothing; - } - inStream.close(); - if (expectMd5Stored) { - fail("Should've thrown because of data corruption."); - } - } catch (IOException ex) { - if (!expectMd5Stored) { - throw ex; - } - StorageException cause = (StorageException)ex.getCause(); - assertNotNull(cause); - assertEquals("Unexpected cause: " + cause, - StorageErrorCodeStrings.INVALID_MD5, cause.getErrorCode()); - } - } - - /** - * Test that by default we check block-level MD5. - */ - @Test - public void testCheckBlockMd5() throws Exception { - testAccount = AzureBlobStorageTestAccount.create(); - testCheckBlockMd5(true); - } - - /** - * Test that we don't check block-level MD5 if we specify that in the - * configuration. - */ - @Test - public void testDontCheckBlockMd5() throws Exception { - Configuration conf = new Configuration(); - conf.setBoolean(KEY_CHECK_BLOCK_MD5, false); - testAccount = AzureBlobStorageTestAccount.create(conf); - testCheckBlockMd5(false); - } - - /** - * Connection inspector to check that MD5 fields for content is set/not set as - * expected. - */ - private static class ContentMD5Checker extends - StorageEvent<ResponseReceivedEvent> { - private final boolean expectMd5; - - public ContentMD5Checker(boolean expectMd5) { - this.expectMd5 = expectMd5; - } - - @Override - public void eventOccurred(ResponseReceivedEvent eventArg) { - HttpURLConnection connection = (HttpURLConnection) eventArg - .getConnectionObject(); - if (isGetRange(connection)) { - checkObtainedMd5(connection - .getHeaderField(Constants.HeaderConstants.CONTENT_MD5)); - } else if (isPutBlock(connection)) { - checkObtainedMd5(connection - .getRequestProperty(Constants.HeaderConstants.CONTENT_MD5)); - } - } - - private void checkObtainedMd5(String obtainedMd5) { - if (expectMd5) { - assertNotNull(obtainedMd5); - } else { - assertNull("Expected no MD5, found: " + obtainedMd5, obtainedMd5); - } - } - - private static boolean isPutBlock(HttpURLConnection connection) { - return connection.getRequestMethod().equals("PUT") - && connection.getURL().getQuery() != null - && connection.getURL().getQuery().contains("blockid"); - } - - private static boolean isGetRange(HttpURLConnection connection) { - return connection.getRequestMethod().equals("GET") - && connection - .getHeaderField(Constants.HeaderConstants.STORAGE_RANGE_HEADER) != null; - } - } - - private void testCheckBlockMd5(final boolean expectMd5Checked) - throws Exception { - assumeNotNull(testAccount); - Path testFilePath = new Path("/testFile"); - - // Add a hook to check that for GET/PUT requests we set/don't set - // the block-level MD5 field as configured. I tried to do clever - // testing by also messing with the raw data to see if we actually - // validate the data as expected, but the HttpURLConnection wasn't - // pluggable enough for me to do that. - testAccount.getFileSystem().getStore() - .addTestHookToOperationContext(new TestHookOperationContext() { - @Override - public OperationContext modifyOperationContext( - OperationContext original) { - original.getResponseReceivedEventHandler().addListener( - new ContentMD5Checker(expectMd5Checked)); - return original; - } - }); - - OutputStream outStream = testAccount.getFileSystem().create(testFilePath); - outStream.write(new byte[] { 5, 15 }); - outStream.close(); - - InputStream inStream = testAccount.getFileSystem().open(testFilePath); - byte[] inBuf = new byte[100]; - while (inStream.read(inBuf) > 0){ - //nothing; - } - inStream.close(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java index 6c49926..30c1028 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java @@ -18,11 +18,6 @@ package org.apache.hadoop.fs.azure; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; - import java.io.Closeable; import java.io.IOException; import java.net.URI; @@ -42,7 +37,7 @@ import org.junit.Test; /** * Tests that we put the correct metadata on blobs created through WASB. */ -public class TestBlobMetadata { +public class TestBlobMetadata extends AbstractWasbTestWithTimeout { private AzureBlobStorageTestAccount testAccount; private FileSystem fs; private InMemoryBlockBlobStore backingStore; http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobOperationDescriptor.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobOperationDescriptor.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobOperationDescriptor.java index 07d4ebc..aca5f81 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobOperationDescriptor.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobOperationDescriptor.java @@ -33,9 +33,6 @@ import org.junit.Test; import java.net.HttpURLConnection; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertEquals; - /** * Tests for <code>BlobOperationDescriptor</code>. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobTypeSpeedDifference.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobTypeSpeedDifference.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobTypeSpeedDifference.java deleted file mode 100644 index afb16ef..0000000 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobTypeSpeedDifference.java +++ /dev/null @@ -1,160 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.azure; - -import java.io.*; -import java.util.*; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; - -import junit.framework.*; - -import org.junit.Test; - - -/** - * A simple benchmark to find out the difference in speed between block - * and page blobs. - */ -public class TestBlobTypeSpeedDifference extends TestCase { - /** - * Writes data to the given stream of the given size, flushing every - * x bytes. - */ - private static void writeTestFile(OutputStream writeStream, - long size, long flushInterval) throws IOException { - int bufferSize = (int) Math.min(1000, flushInterval); - byte[] buffer = new byte[bufferSize]; - Arrays.fill(buffer, (byte) 7); - int bytesWritten = 0; - int bytesUnflushed = 0; - while (bytesWritten < size) { - int numberToWrite = (int) Math.min(bufferSize, size - bytesWritten); - writeStream.write(buffer, 0, numberToWrite); - bytesWritten += numberToWrite; - bytesUnflushed += numberToWrite; - if (bytesUnflushed >= flushInterval) { - writeStream.flush(); - bytesUnflushed = 0; - } - } - } - - private static class TestResult { - final long timeTakenInMs; - final long totalNumberOfRequests; - - TestResult(long timeTakenInMs, long totalNumberOfRequests) { - this.timeTakenInMs = timeTakenInMs; - this.totalNumberOfRequests = totalNumberOfRequests; - } - } - - /** - * Writes data to the given file of the given size, flushing every - * x bytes. Measure performance of that and return it. - */ - private static TestResult writeTestFile(NativeAzureFileSystem fs, Path path, - long size, long flushInterval) throws IOException { - AzureFileSystemInstrumentation instrumentation = - fs.getInstrumentation(); - long initialRequests = instrumentation.getCurrentWebResponses(); - Date start = new Date(); - OutputStream output = fs.create(path); - writeTestFile(output, size, flushInterval); - output.close(); - long finalRequests = instrumentation.getCurrentWebResponses(); - return new TestResult(new Date().getTime() - start.getTime(), - finalRequests - initialRequests); - } - - /** - * Writes data to a block blob of the given size, flushing every - * x bytes. Measure performance of that and return it. - */ - private static TestResult writeBlockBlobTestFile(NativeAzureFileSystem fs, - long size, long flushInterval) throws IOException { - return writeTestFile(fs, new Path("/blockBlob"), size, flushInterval); - } - - /** - * Writes data to a page blob of the given size, flushing every - * x bytes. Measure performance of that and return it. - */ - private static TestResult writePageBlobTestFile(NativeAzureFileSystem fs, - long size, long flushInterval) throws IOException { - return writeTestFile(fs, - AzureBlobStorageTestAccount.pageBlobPath("pageBlob"), - size, flushInterval); - } - - /** - * Runs the benchmark over a small 10 KB file, flushing every 500 bytes. - */ - @Test - public void testTenKbFileFrequentFlush() throws Exception { - AzureBlobStorageTestAccount testAccount = - AzureBlobStorageTestAccount.create(); - if (testAccount == null) { - return; - } - try { - testForSizeAndFlushInterval(testAccount.getFileSystem(), 10 * 1000, 500); - } finally { - testAccount.cleanup(); - } - } - - /** - * Runs the benchmark for the given file size and flush frequency. - */ - private static void testForSizeAndFlushInterval(NativeAzureFileSystem fs, - final long size, final long flushInterval) throws IOException { - for (int i = 0; i < 5; i++) { - TestResult pageBlobResults = writePageBlobTestFile(fs, size, flushInterval); - System.out.printf( - "Page blob upload took %d ms. Total number of requests: %d.\n", - pageBlobResults.timeTakenInMs, pageBlobResults.totalNumberOfRequests); - TestResult blockBlobResults = writeBlockBlobTestFile(fs, size, flushInterval); - System.out.printf( - "Block blob upload took %d ms. Total number of requests: %d.\n", - blockBlobResults.timeTakenInMs, blockBlobResults.totalNumberOfRequests); - } - } - - /** - * Runs the benchmark for the given file size and flush frequency from the - * command line. - */ - public static void main(String argv[]) throws Exception { - Configuration conf = new Configuration(); - long size = 10 * 1000 * 1000; - long flushInterval = 2000; - if (argv.length > 0) { - size = Long.parseLong(argv[0]); - } - if (argv.length > 1) { - flushInterval = Long.parseLong(argv[1]); - } - testForSizeAndFlushInterval((NativeAzureFileSystem)FileSystem.get(conf), - size, flushInterval); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java deleted file mode 100644 index 0ae4012..0000000 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java +++ /dev/null @@ -1,875 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.azure; - -import java.io.EOFException; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.EnumSet; -import java.util.Random; -import java.util.concurrent.Callable; - -import org.junit.FixMethodOrder; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.Timeout; -import org.junit.runners.MethodSorters; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FSExceptionMessages; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.contract.ContractTestUtils; -import org.apache.hadoop.fs.contract.ContractTestUtils.NanoTimer; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeNotNull; - -import static org.apache.hadoop.test.LambdaTestUtils.*; - -/** - * Test semantics and performance of the original block blob input stream - * (KEY_INPUT_STREAM_VERSION=1) and the new - * <code>BlockBlobInputStream</code> (KEY_INPUT_STREAM_VERSION=2). - */ -@FixMethodOrder(MethodSorters.NAME_ASCENDING) - -public class TestBlockBlobInputStream extends AbstractWasbTestBase { - private static final Logger LOG = LoggerFactory.getLogger( - TestBlockBlobInputStream.class); - private static final int KILOBYTE = 1024; - private static final int MEGABYTE = KILOBYTE * KILOBYTE; - private static final int TEST_FILE_SIZE = 6 * MEGABYTE; - private static final Path TEST_FILE_PATH = new Path( - "TestBlockBlobInputStream.txt"); - - private AzureBlobStorageTestAccount accountUsingInputStreamV1; - private AzureBlobStorageTestAccount accountUsingInputStreamV2; - private long testFileLength; - - /** - * Long test timeout. - */ - @Rule - public Timeout testTimeout = new Timeout(10 * 60 * 1000); - private FileStatus testFileStatus; - private Path hugefile; - - @Override - public void setUp() throws Exception { - super.setUp(); - Configuration conf = new Configuration(); - conf.setInt(AzureNativeFileSystemStore.KEY_INPUT_STREAM_VERSION, 1); - - accountUsingInputStreamV1 = AzureBlobStorageTestAccount.create( - "testblockblobinputstream", - EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer), - conf, - true); - - accountUsingInputStreamV2 = AzureBlobStorageTestAccount.create( - "testblockblobinputstream", - EnumSet.noneOf(AzureBlobStorageTestAccount.CreateOptions.class), - null, - true); - - assumeNotNull(accountUsingInputStreamV1); - assumeNotNull(accountUsingInputStreamV2); - hugefile = fs.makeQualified(TEST_FILE_PATH); - try { - testFileStatus = fs.getFileStatus(TEST_FILE_PATH); - testFileLength = testFileStatus.getLen(); - } catch (FileNotFoundException e) { - // file doesn't exist - testFileLength = 0; - } - } - - @Override - protected AzureBlobStorageTestAccount createTestAccount() throws Exception { - Configuration conf = new Configuration(); - conf.setInt(AzureNativeFileSystemStore.KEY_INPUT_STREAM_VERSION, 1); - - accountUsingInputStreamV1 = AzureBlobStorageTestAccount.create( - "testblockblobinputstream", - EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer), - conf, - true); - - accountUsingInputStreamV2 = AzureBlobStorageTestAccount.create( - "testblockblobinputstream", - EnumSet.noneOf(AzureBlobStorageTestAccount.CreateOptions.class), - null, - true); - - assumeNotNull(accountUsingInputStreamV1); - assumeNotNull(accountUsingInputStreamV2); - return accountUsingInputStreamV1; - } - - /** - * Create a test file by repeating the characters in the alphabet. - * @throws IOException - */ - private void createTestFileAndSetLength() throws IOException { - FileSystem fs = accountUsingInputStreamV1.getFileSystem(); - - // To reduce test run time, the test file can be reused. - if (fs.exists(TEST_FILE_PATH)) { - testFileStatus = fs.getFileStatus(TEST_FILE_PATH); - testFileLength = testFileStatus.getLen(); - LOG.info("Reusing test file: {}", testFileStatus); - return; - } - - int sizeOfAlphabet = ('z' - 'a' + 1); - byte[] buffer = new byte[26 * KILOBYTE]; - char character = 'a'; - for (int i = 0; i < buffer.length; i++) { - buffer[i] = (byte) character; - character = (character == 'z') ? 'a' : (char) ((int) character + 1); - } - - LOG.info("Creating test file {} of size: {}", TEST_FILE_PATH, - TEST_FILE_SIZE); - ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); - - try(FSDataOutputStream outputStream = fs.create(TEST_FILE_PATH)) { - int bytesWritten = 0; - while (bytesWritten < TEST_FILE_SIZE) { - outputStream.write(buffer); - bytesWritten += buffer.length; - } - LOG.info("Closing stream {}", outputStream); - ContractTestUtils.NanoTimer closeTimer - = new ContractTestUtils.NanoTimer(); - outputStream.close(); - closeTimer.end("time to close() output stream"); - } - timer.end("time to write %d KB", TEST_FILE_SIZE / 1024); - testFileLength = fs.getFileStatus(TEST_FILE_PATH).getLen(); - } - - void assumeHugeFileExists() throws IOException { - ContractTestUtils.assertPathExists(fs, "huge file not created", hugefile); - FileStatus status = fs.getFileStatus(hugefile); - ContractTestUtils.assertIsFile(hugefile, status); - assertTrue("File " + hugefile + " is empty", status.getLen() > 0); - } - - /** - * Calculate megabits per second from the specified values for bytes and - * milliseconds. - * @param bytes The number of bytes. - * @param milliseconds The number of milliseconds. - * @return The number of megabits per second. - */ - private static double toMbps(long bytes, long milliseconds) { - return bytes / 1000.0 * 8 / milliseconds; - } - - @Test - public void test_0100_CreateHugeFile() throws IOException { - createTestFileAndSetLength(); - } - - @Test - public void test_0200_BasicReadTest() throws Exception { - assumeHugeFileExists(); - - try ( - FSDataInputStream inputStreamV1 - = accountUsingInputStreamV1.getFileSystem().open(TEST_FILE_PATH); - - FSDataInputStream inputStreamV2 - = accountUsingInputStreamV2.getFileSystem().open(TEST_FILE_PATH); - ) { - byte[] bufferV1 = new byte[3 * MEGABYTE]; - byte[] bufferV2 = new byte[bufferV1.length]; - - // v1 forward seek and read a kilobyte into first kilobyte of bufferV1 - inputStreamV1.seek(5 * MEGABYTE); - int numBytesReadV1 = inputStreamV1.read(bufferV1, 0, KILOBYTE); - assertEquals(KILOBYTE, numBytesReadV1); - - // v2 forward seek and read a kilobyte into first kilobyte of bufferV2 - inputStreamV2.seek(5 * MEGABYTE); - int numBytesReadV2 = inputStreamV2.read(bufferV2, 0, KILOBYTE); - assertEquals(KILOBYTE, numBytesReadV2); - - assertArrayEquals(bufferV1, bufferV2); - - int len = MEGABYTE; - int offset = bufferV1.length - len; - - // v1 reverse seek and read a megabyte into last megabyte of bufferV1 - inputStreamV1.seek(3 * MEGABYTE); - numBytesReadV1 = inputStreamV1.read(bufferV1, offset, len); - assertEquals(len, numBytesReadV1); - - // v2 reverse seek and read a megabyte into last megabyte of bufferV2 - inputStreamV2.seek(3 * MEGABYTE); - numBytesReadV2 = inputStreamV2.read(bufferV2, offset, len); - assertEquals(len, numBytesReadV2); - - assertArrayEquals(bufferV1, bufferV2); - } - } - - @Test - public void test_0201_RandomReadTest() throws Exception { - assumeHugeFileExists(); - - try ( - FSDataInputStream inputStreamV1 - = accountUsingInputStreamV1.getFileSystem().open(TEST_FILE_PATH); - - FSDataInputStream inputStreamV2 - = accountUsingInputStreamV2.getFileSystem().open(TEST_FILE_PATH); - ) { - final int bufferSize = 4 * KILOBYTE; - byte[] bufferV1 = new byte[bufferSize]; - byte[] bufferV2 = new byte[bufferV1.length]; - - verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); - - inputStreamV1.seek(0); - inputStreamV2.seek(0); - - verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); - - verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); - - int seekPosition = 2 * KILOBYTE; - inputStreamV1.seek(seekPosition); - inputStreamV2.seek(seekPosition); - - inputStreamV1.seek(0); - inputStreamV2.seek(0); - - verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); - - verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); - - verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); - - seekPosition = 5 * KILOBYTE; - inputStreamV1.seek(seekPosition); - inputStreamV2.seek(seekPosition); - - verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); - - seekPosition = 10 * KILOBYTE; - inputStreamV1.seek(seekPosition); - inputStreamV2.seek(seekPosition); - - verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); - - verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); - - seekPosition = 4100 * KILOBYTE; - inputStreamV1.seek(seekPosition); - inputStreamV2.seek(seekPosition); - - verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); - } - } - - private void verifyConsistentReads(FSDataInputStream inputStreamV1, - FSDataInputStream inputStreamV2, - byte[] bufferV1, - byte[] bufferV2) throws IOException { - int size = bufferV1.length; - final int numBytesReadV1 = inputStreamV1.read(bufferV1, 0, size); - assertEquals("Bytes read from V1 stream", size, numBytesReadV1); - - final int numBytesReadV2 = inputStreamV2.read(bufferV2, 0, size); - assertEquals("Bytes read from V2 stream", size, numBytesReadV2); - - assertArrayEquals("Mismatch in read data", bufferV1, bufferV2); - } - - /** - * Validates the implementation of InputStream.markSupported. - * @throws IOException - */ - @Test - public void test_0301_MarkSupportedV1() throws IOException { - validateMarkSupported(accountUsingInputStreamV1.getFileSystem()); - } - - /** - * Validates the implementation of InputStream.markSupported. - * @throws IOException - */ - @Test - public void test_0302_MarkSupportedV2() throws IOException { - validateMarkSupported(accountUsingInputStreamV1.getFileSystem()); - } - - private void validateMarkSupported(FileSystem fs) throws IOException { - assumeHugeFileExists(); - try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { - assertTrue("mark is not supported", inputStream.markSupported()); - } - } - - /** - * Validates the implementation of InputStream.mark and reset - * for version 1 of the block blob input stream. - * @throws Exception - */ - @Test - public void test_0303_MarkAndResetV1() throws Exception { - validateMarkAndReset(accountUsingInputStreamV1.getFileSystem()); - } - - /** - * Validates the implementation of InputStream.mark and reset - * for version 2 of the block blob input stream. - * @throws Exception - */ - @Test - public void test_0304_MarkAndResetV2() throws Exception { - validateMarkAndReset(accountUsingInputStreamV2.getFileSystem()); - } - - private void validateMarkAndReset(FileSystem fs) throws Exception { - assumeHugeFileExists(); - try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { - inputStream.mark(KILOBYTE - 1); - - byte[] buffer = new byte[KILOBYTE]; - int bytesRead = inputStream.read(buffer); - assertEquals(buffer.length, bytesRead); - - inputStream.reset(); - assertEquals("rest -> pos 0", 0, inputStream.getPos()); - - inputStream.mark(8 * KILOBYTE - 1); - - buffer = new byte[8 * KILOBYTE]; - bytesRead = inputStream.read(buffer); - assertEquals(buffer.length, bytesRead); - - intercept(IOException.class, - "Resetting to invalid mark", - new Callable<FSDataInputStream>() { - @Override - public FSDataInputStream call() throws Exception { - inputStream.reset(); - return inputStream; - } - } - ); - } - } - - /** - * Validates the implementation of Seekable.seekToNewSource, which should - * return false for version 1 of the block blob input stream. - * @throws IOException - */ - @Test - public void test_0305_SeekToNewSourceV1() throws IOException { - validateSeekToNewSource(accountUsingInputStreamV1.getFileSystem()); - } - - /** - * Validates the implementation of Seekable.seekToNewSource, which should - * return false for version 2 of the block blob input stream. - * @throws IOException - */ - @Test - public void test_0306_SeekToNewSourceV2() throws IOException { - validateSeekToNewSource(accountUsingInputStreamV2.getFileSystem()); - } - - private void validateSeekToNewSource(FileSystem fs) throws IOException { - assumeHugeFileExists(); - try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { - assertFalse(inputStream.seekToNewSource(0)); - } - } - - /** - * Validates the implementation of InputStream.skip and ensures there is no - * network I/O for version 1 of the block blob input stream. - * @throws Exception - */ - @Test - public void test_0307_SkipBoundsV1() throws Exception { - validateSkipBounds(accountUsingInputStreamV1.getFileSystem()); - } - - /** - * Validates the implementation of InputStream.skip and ensures there is no - * network I/O for version 2 of the block blob input stream. - * @throws Exception - */ - @Test - public void test_0308_SkipBoundsV2() throws Exception { - validateSkipBounds(accountUsingInputStreamV2.getFileSystem()); - } - - private void validateSkipBounds(FileSystem fs) throws Exception { - assumeHugeFileExists(); - try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { - NanoTimer timer = new NanoTimer(); - - long skipped = inputStream.skip(-1); - assertEquals(0, skipped); - - skipped = inputStream.skip(0); - assertEquals(0, skipped); - - assertTrue(testFileLength > 0); - - skipped = inputStream.skip(testFileLength); - assertEquals(testFileLength, skipped); - - intercept(EOFException.class, - new Callable<Long>() { - @Override - public Long call() throws Exception { - return inputStream.skip(1); - } - } - ); - long elapsedTimeMs = timer.elapsedTimeMs(); - assertTrue( - String.format( - "There should not be any network I/O (elapsedTimeMs=%1$d).", - elapsedTimeMs), - elapsedTimeMs < 20); - } - } - - /** - * Validates the implementation of Seekable.seek and ensures there is no - * network I/O for forward seek. - * @throws Exception - */ - @Test - public void test_0309_SeekBoundsV1() throws Exception { - validateSeekBounds(accountUsingInputStreamV1.getFileSystem()); - } - - /** - * Validates the implementation of Seekable.seek and ensures there is no - * network I/O for forward seek. - * @throws Exception - */ - @Test - public void test_0310_SeekBoundsV2() throws Exception { - validateSeekBounds(accountUsingInputStreamV2.getFileSystem()); - } - - private void validateSeekBounds(FileSystem fs) throws Exception { - assumeHugeFileExists(); - try ( - FSDataInputStream inputStream = fs.open(TEST_FILE_PATH); - ) { - NanoTimer timer = new NanoTimer(); - - inputStream.seek(0); - assertEquals(0, inputStream.getPos()); - - intercept(EOFException.class, - FSExceptionMessages.NEGATIVE_SEEK, - new Callable<FSDataInputStream>() { - @Override - public FSDataInputStream call() throws Exception { - inputStream.seek(-1); - return inputStream; - } - } - ); - - assertTrue("Test file length only " + testFileLength, testFileLength > 0); - inputStream.seek(testFileLength); - assertEquals(testFileLength, inputStream.getPos()); - - intercept(EOFException.class, - FSExceptionMessages.CANNOT_SEEK_PAST_EOF, - new Callable<FSDataInputStream>() { - @Override - public FSDataInputStream call() throws Exception { - inputStream.seek(testFileLength + 1); - return inputStream; - } - } - ); - - long elapsedTimeMs = timer.elapsedTimeMs(); - assertTrue( - String.format( - "There should not be any network I/O (elapsedTimeMs=%1$d).", - elapsedTimeMs), - elapsedTimeMs < 20); - } - } - - /** - * Validates the implementation of Seekable.seek, Seekable.getPos, - * and InputStream.available. - * @throws Exception - */ - @Test - public void test_0311_SeekAndAvailableAndPositionV1() throws Exception { - validateSeekAndAvailableAndPosition( - accountUsingInputStreamV1.getFileSystem()); - } - - /** - * Validates the implementation of Seekable.seek, Seekable.getPos, - * and InputStream.available. - * @throws Exception - */ - @Test - public void test_0312_SeekAndAvailableAndPositionV2() throws Exception { - validateSeekAndAvailableAndPosition( - accountUsingInputStreamV2.getFileSystem()); - } - - private void validateSeekAndAvailableAndPosition(FileSystem fs) - throws Exception { - assumeHugeFileExists(); - try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { - byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'}; - byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'}; - byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'}; - byte[] expected4 = {(byte) 'g', (byte) 'h', (byte) 'i'}; - byte[] buffer = new byte[3]; - - int bytesRead = inputStream.read(buffer); - assertEquals(buffer.length, bytesRead); - assertArrayEquals(expected1, buffer); - assertEquals(buffer.length, inputStream.getPos()); - assertEquals(testFileLength - inputStream.getPos(), - inputStream.available()); - - bytesRead = inputStream.read(buffer); - assertEquals(buffer.length, bytesRead); - assertArrayEquals(expected2, buffer); - assertEquals(2 * buffer.length, inputStream.getPos()); - assertEquals(testFileLength - inputStream.getPos(), - inputStream.available()); - - // reverse seek - int seekPos = 0; - inputStream.seek(seekPos); - - bytesRead = inputStream.read(buffer); - assertEquals(buffer.length, bytesRead); - assertArrayEquals(expected1, buffer); - assertEquals(buffer.length + seekPos, inputStream.getPos()); - assertEquals(testFileLength - inputStream.getPos(), - inputStream.available()); - - // reverse seek - seekPos = 1; - inputStream.seek(seekPos); - - bytesRead = inputStream.read(buffer); - assertEquals(buffer.length, bytesRead); - assertArrayEquals(expected3, buffer); - assertEquals(buffer.length + seekPos, inputStream.getPos()); - assertEquals(testFileLength - inputStream.getPos(), - inputStream.available()); - - // forward seek - seekPos = 6; - inputStream.seek(seekPos); - - bytesRead = inputStream.read(buffer); - assertEquals(buffer.length, bytesRead); - assertArrayEquals(expected4, buffer); - assertEquals(buffer.length + seekPos, inputStream.getPos()); - assertEquals(testFileLength - inputStream.getPos(), - inputStream.available()); - } - } - - /** - * Validates the implementation of InputStream.skip, Seekable.getPos, - * and InputStream.available. - * @throws IOException - */ - @Test - public void test_0313_SkipAndAvailableAndPositionV1() throws IOException { - validateSkipAndAvailableAndPosition( - accountUsingInputStreamV1.getFileSystem()); - } - - /** - * Validates the implementation of InputStream.skip, Seekable.getPos, - * and InputStream.available. - * @throws IOException - */ - @Test - public void test_0314_SkipAndAvailableAndPositionV2() throws IOException { - validateSkipAndAvailableAndPosition( - accountUsingInputStreamV1.getFileSystem()); - } - - private void validateSkipAndAvailableAndPosition(FileSystem fs) - throws IOException { - assumeHugeFileExists(); - try ( - FSDataInputStream inputStream = fs.open(TEST_FILE_PATH); - ) { - byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'}; - byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'}; - byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'}; - byte[] expected4 = {(byte) 'g', (byte) 'h', (byte) 'i'}; - - assertEquals(testFileLength, inputStream.available()); - assertEquals(0, inputStream.getPos()); - - int n = 3; - long skipped = inputStream.skip(n); - - assertEquals(skipped, inputStream.getPos()); - assertEquals(testFileLength - inputStream.getPos(), - inputStream.available()); - assertEquals(skipped, n); - - byte[] buffer = new byte[3]; - int bytesRead = inputStream.read(buffer); - assertEquals(buffer.length, bytesRead); - assertArrayEquals(expected2, buffer); - assertEquals(buffer.length + skipped, inputStream.getPos()); - assertEquals(testFileLength - inputStream.getPos(), - inputStream.available()); - - // does skip still work after seek? - int seekPos = 1; - inputStream.seek(seekPos); - - bytesRead = inputStream.read(buffer); - assertEquals(buffer.length, bytesRead); - assertArrayEquals(expected3, buffer); - assertEquals(buffer.length + seekPos, inputStream.getPos()); - assertEquals(testFileLength - inputStream.getPos(), - inputStream.available()); - - long currentPosition = inputStream.getPos(); - n = 2; - skipped = inputStream.skip(n); - - assertEquals(currentPosition + skipped, inputStream.getPos()); - assertEquals(testFileLength - inputStream.getPos(), - inputStream.available()); - assertEquals(skipped, n); - - bytesRead = inputStream.read(buffer); - assertEquals(buffer.length, bytesRead); - assertArrayEquals(expected4, buffer); - assertEquals(buffer.length + skipped + currentPosition, - inputStream.getPos()); - assertEquals(testFileLength - inputStream.getPos(), - inputStream.available()); - } - } - - /** - * Ensures parity in the performance of sequential read for - * version 1 and version 2 of the block blob input stream. - * @throws IOException - */ - @Test - public void test_0315_SequentialReadPerformance() throws IOException { - assumeHugeFileExists(); - final int maxAttempts = 10; - final double maxAcceptableRatio = 1.01; - double v1ElapsedMs = 0, v2ElapsedMs = 0; - double ratio = Double.MAX_VALUE; - for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) { - v1ElapsedMs = sequentialRead(1, - accountUsingInputStreamV1.getFileSystem(), false); - v2ElapsedMs = sequentialRead(2, - accountUsingInputStreamV2.getFileSystem(), false); - ratio = v2ElapsedMs / v1ElapsedMs; - LOG.info(String.format( - "v1ElapsedMs=%1$d, v2ElapsedMs=%2$d, ratio=%3$.2f", - (long) v1ElapsedMs, - (long) v2ElapsedMs, - ratio)); - } - assertTrue(String.format( - "Performance of version 2 is not acceptable: v1ElapsedMs=%1$d," - + " v2ElapsedMs=%2$d, ratio=%3$.2f", - (long) v1ElapsedMs, - (long) v2ElapsedMs, - ratio), - ratio < maxAcceptableRatio); - } - - /** - * Ensures parity in the performance of sequential read after reverse seek for - * version 2 of the block blob input stream. - * @throws IOException - */ - @Test - public void test_0316_SequentialReadAfterReverseSeekPerformanceV2() - throws IOException { - assumeHugeFileExists(); - final int maxAttempts = 10; - final double maxAcceptableRatio = 1.01; - double beforeSeekElapsedMs = 0, afterSeekElapsedMs = 0; - double ratio = Double.MAX_VALUE; - for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) { - beforeSeekElapsedMs = sequentialRead(2, - accountUsingInputStreamV2.getFileSystem(), false); - afterSeekElapsedMs = sequentialRead(2, - accountUsingInputStreamV2.getFileSystem(), true); - ratio = afterSeekElapsedMs / beforeSeekElapsedMs; - LOG.info(String.format( - "beforeSeekElapsedMs=%1$d, afterSeekElapsedMs=%2$d, ratio=%3$.2f", - (long) beforeSeekElapsedMs, - (long) afterSeekElapsedMs, - ratio)); - } - assertTrue(String.format( - "Performance of version 2 after reverse seek is not acceptable:" - + " beforeSeekElapsedMs=%1$d, afterSeekElapsedMs=%2$d," - + " ratio=%3$.2f", - (long) beforeSeekElapsedMs, - (long) afterSeekElapsedMs, - ratio), - ratio < maxAcceptableRatio); - } - - private long sequentialRead(int version, - FileSystem fs, - boolean afterReverseSeek) throws IOException { - byte[] buffer = new byte[16 * KILOBYTE]; - long totalBytesRead = 0; - long bytesRead = 0; - - try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { - if (afterReverseSeek) { - while (bytesRead > 0 && totalBytesRead < 4 * MEGABYTE) { - bytesRead = inputStream.read(buffer); - totalBytesRead += bytesRead; - } - totalBytesRead = 0; - inputStream.seek(0); - } - - NanoTimer timer = new NanoTimer(); - while ((bytesRead = inputStream.read(buffer)) > 0) { - totalBytesRead += bytesRead; - } - long elapsedTimeMs = timer.elapsedTimeMs(); - - LOG.info(String.format( - "v%1$d: bytesRead=%2$d, elapsedMs=%3$d, Mbps=%4$.2f," - + " afterReverseSeek=%5$s", - version, - totalBytesRead, - elapsedTimeMs, - toMbps(totalBytesRead, elapsedTimeMs), - afterReverseSeek)); - - assertEquals(testFileLength, totalBytesRead); - inputStream.close(); - return elapsedTimeMs; - } - } - - @Test - public void test_0317_RandomReadPerformance() throws IOException { - assumeHugeFileExists(); - final int maxAttempts = 10; - final double maxAcceptableRatio = 0.10; - double v1ElapsedMs = 0, v2ElapsedMs = 0; - double ratio = Double.MAX_VALUE; - for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) { - v1ElapsedMs = randomRead(1, - accountUsingInputStreamV1.getFileSystem()); - v2ElapsedMs = randomRead(2, - accountUsingInputStreamV2.getFileSystem()); - ratio = v2ElapsedMs / v1ElapsedMs; - LOG.info(String.format( - "v1ElapsedMs=%1$d, v2ElapsedMs=%2$d, ratio=%3$.2f", - (long) v1ElapsedMs, - (long) v2ElapsedMs, - ratio)); - } - assertTrue(String.format( - "Performance of version 2 is not acceptable: v1ElapsedMs=%1$d," - + " v2ElapsedMs=%2$d, ratio=%3$.2f", - (long) v1ElapsedMs, - (long) v2ElapsedMs, - ratio), - ratio < maxAcceptableRatio); - } - - private long randomRead(int version, FileSystem fs) throws IOException { - assumeHugeFileExists(); - final int minBytesToRead = 2 * MEGABYTE; - Random random = new Random(); - byte[] buffer = new byte[8 * KILOBYTE]; - long totalBytesRead = 0; - long bytesRead = 0; - try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { - NanoTimer timer = new NanoTimer(); - - do { - bytesRead = inputStream.read(buffer); - totalBytesRead += bytesRead; - inputStream.seek(random.nextInt( - (int) (testFileLength - buffer.length))); - } while (bytesRead > 0 && totalBytesRead < minBytesToRead); - - long elapsedTimeMs = timer.elapsedTimeMs(); - - inputStream.close(); - - LOG.info(String.format( - "v%1$d: totalBytesRead=%2$d, elapsedTimeMs=%3$d, Mbps=%4$.2f", - version, - totalBytesRead, - elapsedTimeMs, - toMbps(totalBytesRead, elapsedTimeMs))); - - assertTrue(minBytesToRead <= totalBytesRead); - - return elapsedTimeMs; - } - } - - @Test - public void test_999_DeleteHugeFiles() throws IOException { - ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); - fs.delete(TEST_FILE_PATH, false); - timer.end("time to delete %s", TEST_FILE_PATH); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestClientThrottlingAnalyzer.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestClientThrottlingAnalyzer.java index 307e5af..c2496d7 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestClientThrottlingAnalyzer.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestClientThrottlingAnalyzer.java @@ -21,13 +21,10 @@ package org.apache.hadoop.fs.azure; import org.apache.hadoop.fs.contract.ContractTestUtils.NanoTimer; import org.junit.Test; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertEquals; - /** * Tests for <code>ClientThrottlingAnalyzer</code>. */ -public class TestClientThrottlingAnalyzer { +public class TestClientThrottlingAnalyzer extends AbstractWasbTestWithTimeout { private static final int ANALYSIS_PERIOD = 1000; private static final int ANALYSIS_PERIOD_PLUS_10_PERCENT = ANALYSIS_PERIOD + ANALYSIS_PERIOD / 10; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org