http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlobDataValidation.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlobDataValidation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlobDataValidation.java new file mode 100644 index 0000000..0aa9393 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlobDataValidation.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_CHECK_BLOCK_MD5; +import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_STORE_BLOB_MD5; +import static org.junit.Assume.assumeNotNull; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.util.Arrays; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.TestHookOperationContext; +import org.apache.hadoop.fs.azure.integration.AzureTestUtils; + +import org.junit.After; +import org.junit.Test; + +import com.microsoft.azure.storage.Constants; +import com.microsoft.azure.storage.OperationContext; +import com.microsoft.azure.storage.ResponseReceivedEvent; +import com.microsoft.azure.storage.StorageErrorCodeStrings; +import com.microsoft.azure.storage.StorageEvent; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.BlockEntry; +import com.microsoft.azure.storage.blob.BlockSearchMode; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import com.microsoft.azure.storage.core.Base64; + +/** + * Test that we do proper data integrity validation with MD5 checks as + * configured. + */ +public class ITestBlobDataValidation extends AbstractWasbTestWithTimeout { + private AzureBlobStorageTestAccount testAccount; + + @After + public void tearDown() throws Exception { + testAccount = AzureTestUtils.cleanupTestAccount(testAccount); + } + + /** + * Test that by default we don't store the blob-level MD5. + */ + @Test + public void testBlobMd5StoreOffByDefault() throws Exception { + testAccount = AzureBlobStorageTestAccount.create(); + testStoreBlobMd5(false); + } + + /** + * Test that we get blob-level MD5 storage and validation if we specify that + * in the configuration. + */ + @Test + public void testStoreBlobMd5() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(KEY_STORE_BLOB_MD5, true); + testAccount = AzureBlobStorageTestAccount.create(conf); + testStoreBlobMd5(true); + } + + /** + * Trims a suffix/prefix from the given string. For example if + * s is given as "/xy" and toTrim is "/", this method returns "xy" + */ + private static String trim(String s, String toTrim) { + return StringUtils.removeEnd(StringUtils.removeStart(s, toTrim), + toTrim); + } + + private void testStoreBlobMd5(boolean expectMd5Stored) throws Exception { + assumeNotNull(testAccount); + // Write a test file. + NativeAzureFileSystem fs = testAccount.getFileSystem(); + Path testFilePath = AzureTestUtils.pathForTests(fs, + methodName.getMethodName()); + String testFileKey = trim(testFilePath.toUri().getPath(), "/"); + OutputStream outStream = fs.create(testFilePath); + outStream.write(new byte[] { 5, 15 }); + outStream.close(); + + // Check that we stored/didn't store the MD5 field as configured. + CloudBlockBlob blob = testAccount.getBlobReference(testFileKey); + blob.downloadAttributes(); + String obtainedMd5 = blob.getProperties().getContentMD5(); + if (expectMd5Stored) { + assertNotNull(obtainedMd5); + } else { + assertNull("Expected no MD5, found: " + obtainedMd5, obtainedMd5); + } + + // Mess with the content so it doesn't match the MD5. + String newBlockId = Base64.encode(new byte[] { 55, 44, 33, 22 }); + blob.uploadBlock(newBlockId, + new ByteArrayInputStream(new byte[] { 6, 45 }), 2); + blob.commitBlockList(Arrays.asList(new BlockEntry[] { new BlockEntry( + newBlockId, BlockSearchMode.UNCOMMITTED) })); + + // Now read back the content. If we stored the MD5 for the blob content + // we should get a data corruption error. + InputStream inStream = fs.open(testFilePath); + try { + byte[] inBuf = new byte[100]; + while (inStream.read(inBuf) > 0){ + //nothing; + } + inStream.close(); + if (expectMd5Stored) { + fail("Should've thrown because of data corruption."); + } + } catch (IOException ex) { + if (!expectMd5Stored) { + throw ex; + } + StorageException cause = (StorageException)ex.getCause(); + assertNotNull(cause); + assertEquals("Unexpected cause: " + cause, + StorageErrorCodeStrings.INVALID_MD5, cause.getErrorCode()); + } + } + + /** + * Test that by default we check block-level MD5. + */ + @Test + public void testCheckBlockMd5() throws Exception { + testAccount = AzureBlobStorageTestAccount.create(); + testCheckBlockMd5(true); + } + + /** + * Test that we don't check block-level MD5 if we specify that in the + * configuration. + */ + @Test + public void testDontCheckBlockMd5() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(KEY_CHECK_BLOCK_MD5, false); + testAccount = AzureBlobStorageTestAccount.create(conf); + testCheckBlockMd5(false); + } + + /** + * Connection inspector to check that MD5 fields for content is set/not set as + * expected. + */ + private static class ContentMD5Checker extends + StorageEvent<ResponseReceivedEvent> { + private final boolean expectMd5; + + public ContentMD5Checker(boolean expectMd5) { + this.expectMd5 = expectMd5; + } + + @Override + public void eventOccurred(ResponseReceivedEvent eventArg) { + HttpURLConnection connection = (HttpURLConnection) eventArg + .getConnectionObject(); + if (isGetRange(connection)) { + checkObtainedMd5(connection + .getHeaderField(Constants.HeaderConstants.CONTENT_MD5)); + } else if (isPutBlock(connection)) { + checkObtainedMd5(connection + .getRequestProperty(Constants.HeaderConstants.CONTENT_MD5)); + } + } + + private void checkObtainedMd5(String obtainedMd5) { + if (expectMd5) { + assertNotNull(obtainedMd5); + } else { + assertNull("Expected no MD5, found: " + obtainedMd5, obtainedMd5); + } + } + + private static boolean isPutBlock(HttpURLConnection connection) { + return connection.getRequestMethod().equals("PUT") + && connection.getURL().getQuery() != null + && connection.getURL().getQuery().contains("blockid"); + } + + private static boolean isGetRange(HttpURLConnection connection) { + return connection.getRequestMethod().equals("GET") + && connection + .getHeaderField(Constants.HeaderConstants.STORAGE_RANGE_HEADER) != null; + } + } + + private void testCheckBlockMd5(final boolean expectMd5Checked) + throws Exception { + assumeNotNull(testAccount); + Path testFilePath = new Path("/testFile"); + + // Add a hook to check that for GET/PUT requests we set/don't set + // the block-level MD5 field as configured. I tried to do clever + // testing by also messing with the raw data to see if we actually + // validate the data as expected, but the HttpURLConnection wasn't + // pluggable enough for me to do that. + testAccount.getFileSystem().getStore() + .addTestHookToOperationContext(new TestHookOperationContext() { + @Override + public OperationContext modifyOperationContext( + OperationContext original) { + original.getResponseReceivedEventHandler().addListener( + new ContentMD5Checker(expectMd5Checked)); + return original; + } + }); + + OutputStream outStream = testAccount.getFileSystem().create(testFilePath); + outStream.write(new byte[] { 5, 15 }); + outStream.close(); + + InputStream inStream = testAccount.getFileSystem().open(testFilePath); + byte[] inBuf = new byte[100]; + while (inStream.read(inBuf) > 0){ + //nothing; + } + inStream.close(); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlobTypeSpeedDifference.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlobTypeSpeedDifference.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlobTypeSpeedDifference.java new file mode 100644 index 0000000..b46ad5b --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlobTypeSpeedDifference.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Date; + +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azure.integration.AzureTestUtils; +import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; + + +/** + * A simple benchmark to find out the difference in speed between block + * and page blobs. + */ +public class ITestBlobTypeSpeedDifference extends AbstractWasbTestBase { + + @Override + protected AzureBlobStorageTestAccount createTestAccount() throws Exception { + return AzureBlobStorageTestAccount.create(); + } + + /** + * Writes data to the given stream of the given size, flushing every + * x bytes. + */ + private static void writeTestFile(OutputStream writeStream, + long size, long flushInterval) throws IOException { + int bufferSize = (int) Math.min(1000, flushInterval); + byte[] buffer = new byte[bufferSize]; + Arrays.fill(buffer, (byte) 7); + int bytesWritten = 0; + int bytesUnflushed = 0; + while (bytesWritten < size) { + int numberToWrite = (int) Math.min(bufferSize, size - bytesWritten); + writeStream.write(buffer, 0, numberToWrite); + bytesWritten += numberToWrite; + bytesUnflushed += numberToWrite; + if (bytesUnflushed >= flushInterval) { + writeStream.flush(); + bytesUnflushed = 0; + } + } + } + + private static class TestResult { + final long timeTakenInMs; + final long totalNumberOfRequests; + + TestResult(long timeTakenInMs, long totalNumberOfRequests) { + this.timeTakenInMs = timeTakenInMs; + this.totalNumberOfRequests = totalNumberOfRequests; + } + } + + /** + * Writes data to the given file of the given size, flushing every + * x bytes. Measure performance of that and return it. + */ + private static TestResult writeTestFile(NativeAzureFileSystem fs, Path path, + long size, long flushInterval) throws IOException { + AzureFileSystemInstrumentation instrumentation = + fs.getInstrumentation(); + long initialRequests = instrumentation.getCurrentWebResponses(); + Date start = new Date(); + OutputStream output = fs.create(path); + writeTestFile(output, size, flushInterval); + output.close(); + long finalRequests = instrumentation.getCurrentWebResponses(); + return new TestResult(new Date().getTime() - start.getTime(), + finalRequests - initialRequests); + } + + /** + * Writes data to a block blob of the given size, flushing every + * x bytes. Measure performance of that and return it. + */ + private static TestResult writeBlockBlobTestFile(NativeAzureFileSystem fs, + long size, long flushInterval) throws IOException { + return writeTestFile(fs, new Path("/blockBlob"), size, flushInterval); + } + + /** + * Writes data to a page blob of the given size, flushing every + * x bytes. Measure performance of that and return it. + */ + private static TestResult writePageBlobTestFile(NativeAzureFileSystem fs, + long size, long flushInterval) throws IOException { + Path testFile = AzureTestUtils.blobPathForTests(fs, + "writePageBlobTestFile"); + return writeTestFile(fs, + testFile, + size, flushInterval); + } + + /** + * Runs the benchmark over a small 10 KB file, flushing every 500 bytes. + */ + @Test + public void testTenKbFileFrequentFlush() throws Exception { + testForSizeAndFlushInterval(getFileSystem(), 10 * 1000, 500); + } + + /** + * Runs the benchmark for the given file size and flush frequency. + */ + private static void testForSizeAndFlushInterval(NativeAzureFileSystem fs, + final long size, final long flushInterval) throws IOException { + for (int i = 0; i < 5; i++) { + TestResult pageBlobResults = writePageBlobTestFile(fs, size, flushInterval); + System.out.printf( + "Page blob upload took %d ms. Total number of requests: %d.\n", + pageBlobResults.timeTakenInMs, pageBlobResults.totalNumberOfRequests); + TestResult blockBlobResults = writeBlockBlobTestFile(fs, size, flushInterval); + System.out.printf( + "Block blob upload took %d ms. Total number of requests: %d.\n", + blockBlobResults.timeTakenInMs, blockBlobResults.totalNumberOfRequests); + } + } + + /** + * Runs the benchmark for the given file size and flush frequency from the + * command line. + */ + public static void main(String[] argv) throws Exception { + Configuration conf = new Configuration(); + long size = 10 * 1000 * 1000; + long flushInterval = 2000; + if (argv.length > 0) { + size = Long.parseLong(argv[0]); + } + if (argv.length > 1) { + flushInterval = Long.parseLong(argv[1]); + } + testForSizeAndFlushInterval( + (NativeAzureFileSystem) FileSystem.get(conf), + size, + flushInterval); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java new file mode 100644 index 0000000..07a13df --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java @@ -0,0 +1,874 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.io.EOFException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.EnumSet; +import java.util.Random; +import java.util.concurrent.Callable; + +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azure.integration.AbstractAzureScaleTest; +import org.apache.hadoop.fs.azure.integration.AzureTestUtils; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.contract.ContractTestUtils.NanoTimer; + +import static org.junit.Assume.assumeNotNull; + +import static org.apache.hadoop.test.LambdaTestUtils.*; + +/** + * Test semantics and performance of the original block blob input stream + * (KEY_INPUT_STREAM_VERSION=1) and the new + * <code>BlockBlobInputStream</code> (KEY_INPUT_STREAM_VERSION=2). + */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) + +public class ITestBlockBlobInputStream extends AbstractAzureScaleTest { + private static final Logger LOG = LoggerFactory.getLogger( + ITestBlockBlobInputStream.class); + private static final int KILOBYTE = 1024; + private static final int MEGABYTE = KILOBYTE * KILOBYTE; + private static final int TEST_FILE_SIZE = 6 * MEGABYTE; + private static final Path TEST_FILE_PATH = new Path( + "TestBlockBlobInputStream.txt"); + + private AzureBlobStorageTestAccount accountUsingInputStreamV1; + private AzureBlobStorageTestAccount accountUsingInputStreamV2; + private long testFileLength; + + + + private FileStatus testFileStatus; + private Path hugefile; + + @Override + public void setUp() throws Exception { + super.setUp(); + Configuration conf = new Configuration(); + conf.setInt(AzureNativeFileSystemStore.KEY_INPUT_STREAM_VERSION, 1); + + accountUsingInputStreamV1 = AzureBlobStorageTestAccount.create( + "testblockblobinputstream", + EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer), + conf, + true); + + accountUsingInputStreamV2 = AzureBlobStorageTestAccount.create( + "testblockblobinputstream", + EnumSet.noneOf(AzureBlobStorageTestAccount.CreateOptions.class), + null, + true); + + assumeNotNull(accountUsingInputStreamV1); + assumeNotNull(accountUsingInputStreamV2); + hugefile = fs.makeQualified(TEST_FILE_PATH); + try { + testFileStatus = fs.getFileStatus(TEST_FILE_PATH); + testFileLength = testFileStatus.getLen(); + } catch (FileNotFoundException e) { + // file doesn't exist + testFileLength = 0; + } + } + + @Override + protected AzureBlobStorageTestAccount createTestAccount() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(AzureNativeFileSystemStore.KEY_INPUT_STREAM_VERSION, 1); + + accountUsingInputStreamV1 = AzureBlobStorageTestAccount.create( + "testblockblobinputstream", + EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer), + conf, + true); + + accountUsingInputStreamV2 = AzureBlobStorageTestAccount.create( + "testblockblobinputstream", + EnumSet.noneOf(AzureBlobStorageTestAccount.CreateOptions.class), + null, + true); + + assumeNotNull(accountUsingInputStreamV1); + assumeNotNull(accountUsingInputStreamV2); + return accountUsingInputStreamV1; + } + + /** + * Create a test file by repeating the characters in the alphabet. + * @throws IOException + */ + private void createTestFileAndSetLength() throws IOException { + FileSystem fs = accountUsingInputStreamV1.getFileSystem(); + + // To reduce test run time, the test file can be reused. + if (fs.exists(TEST_FILE_PATH)) { + testFileStatus = fs.getFileStatus(TEST_FILE_PATH); + testFileLength = testFileStatus.getLen(); + LOG.info("Reusing test file: {}", testFileStatus); + return; + } + + int sizeOfAlphabet = ('z' - 'a' + 1); + byte[] buffer = new byte[26 * KILOBYTE]; + char character = 'a'; + for (int i = 0; i < buffer.length; i++) { + buffer[i] = (byte) character; + character = (character == 'z') ? 'a' : (char) ((int) character + 1); + } + + LOG.info("Creating test file {} of size: {}", TEST_FILE_PATH, + TEST_FILE_SIZE); + ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); + + try(FSDataOutputStream outputStream = fs.create(TEST_FILE_PATH)) { + int bytesWritten = 0; + while (bytesWritten < TEST_FILE_SIZE) { + outputStream.write(buffer); + bytesWritten += buffer.length; + } + LOG.info("Closing stream {}", outputStream); + ContractTestUtils.NanoTimer closeTimer + = new ContractTestUtils.NanoTimer(); + outputStream.close(); + closeTimer.end("time to close() output stream"); + } + timer.end("time to write %d KB", TEST_FILE_SIZE / 1024); + testFileLength = fs.getFileStatus(TEST_FILE_PATH).getLen(); + } + + void assumeHugeFileExists() throws IOException { + ContractTestUtils.assertPathExists(fs, "huge file not created", hugefile); + FileStatus status = fs.getFileStatus(hugefile); + ContractTestUtils.assertIsFile(hugefile, status); + assertTrue("File " + hugefile + " is empty", status.getLen() > 0); + } + + /** + * Calculate megabits per second from the specified values for bytes and + * milliseconds. + * @param bytes The number of bytes. + * @param milliseconds The number of milliseconds. + * @return The number of megabits per second. + */ + private static double toMbps(long bytes, long milliseconds) { + return bytes / 1000.0 * 8 / milliseconds; + } + + @Test + public void test_0100_CreateHugeFile() throws IOException { + createTestFileAndSetLength(); + } + + @Test + public void test_0200_BasicReadTest() throws Exception { + assumeHugeFileExists(); + + try ( + FSDataInputStream inputStreamV1 + = accountUsingInputStreamV1.getFileSystem().open(TEST_FILE_PATH); + + FSDataInputStream inputStreamV2 + = accountUsingInputStreamV2.getFileSystem().open(TEST_FILE_PATH); + ) { + byte[] bufferV1 = new byte[3 * MEGABYTE]; + byte[] bufferV2 = new byte[bufferV1.length]; + + // v1 forward seek and read a kilobyte into first kilobyte of bufferV1 + inputStreamV1.seek(5 * MEGABYTE); + int numBytesReadV1 = inputStreamV1.read(bufferV1, 0, KILOBYTE); + assertEquals(KILOBYTE, numBytesReadV1); + + // v2 forward seek and read a kilobyte into first kilobyte of bufferV2 + inputStreamV2.seek(5 * MEGABYTE); + int numBytesReadV2 = inputStreamV2.read(bufferV2, 0, KILOBYTE); + assertEquals(KILOBYTE, numBytesReadV2); + + assertArrayEquals(bufferV1, bufferV2); + + int len = MEGABYTE; + int offset = bufferV1.length - len; + + // v1 reverse seek and read a megabyte into last megabyte of bufferV1 + inputStreamV1.seek(3 * MEGABYTE); + numBytesReadV1 = inputStreamV1.read(bufferV1, offset, len); + assertEquals(len, numBytesReadV1); + + // v2 reverse seek and read a megabyte into last megabyte of bufferV2 + inputStreamV2.seek(3 * MEGABYTE); + numBytesReadV2 = inputStreamV2.read(bufferV2, offset, len); + assertEquals(len, numBytesReadV2); + + assertArrayEquals(bufferV1, bufferV2); + } + } + + @Test + public void test_0201_RandomReadTest() throws Exception { + assumeHugeFileExists(); + + try ( + FSDataInputStream inputStreamV1 + = accountUsingInputStreamV1.getFileSystem().open(TEST_FILE_PATH); + + FSDataInputStream inputStreamV2 + = accountUsingInputStreamV2.getFileSystem().open(TEST_FILE_PATH); + ) { + final int bufferSize = 4 * KILOBYTE; + byte[] bufferV1 = new byte[bufferSize]; + byte[] bufferV2 = new byte[bufferV1.length]; + + verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); + + inputStreamV1.seek(0); + inputStreamV2.seek(0); + + verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); + + verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); + + int seekPosition = 2 * KILOBYTE; + inputStreamV1.seek(seekPosition); + inputStreamV2.seek(seekPosition); + + inputStreamV1.seek(0); + inputStreamV2.seek(0); + + verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); + + verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); + + verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); + + seekPosition = 5 * KILOBYTE; + inputStreamV1.seek(seekPosition); + inputStreamV2.seek(seekPosition); + + verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); + + seekPosition = 10 * KILOBYTE; + inputStreamV1.seek(seekPosition); + inputStreamV2.seek(seekPosition); + + verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); + + verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); + + seekPosition = 4100 * KILOBYTE; + inputStreamV1.seek(seekPosition); + inputStreamV2.seek(seekPosition); + + verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); + } + } + + private void verifyConsistentReads(FSDataInputStream inputStreamV1, + FSDataInputStream inputStreamV2, + byte[] bufferV1, + byte[] bufferV2) throws IOException { + int size = bufferV1.length; + final int numBytesReadV1 = inputStreamV1.read(bufferV1, 0, size); + assertEquals("Bytes read from V1 stream", size, numBytesReadV1); + + final int numBytesReadV2 = inputStreamV2.read(bufferV2, 0, size); + assertEquals("Bytes read from V2 stream", size, numBytesReadV2); + + assertArrayEquals("Mismatch in read data", bufferV1, bufferV2); + } + + /** + * Validates the implementation of InputStream.markSupported. + * @throws IOException + */ + @Test + public void test_0301_MarkSupportedV1() throws IOException { + validateMarkSupported(accountUsingInputStreamV1.getFileSystem()); + } + + /** + * Validates the implementation of InputStream.markSupported. + * @throws IOException + */ + @Test + public void test_0302_MarkSupportedV2() throws IOException { + validateMarkSupported(accountUsingInputStreamV1.getFileSystem()); + } + + private void validateMarkSupported(FileSystem fs) throws IOException { + assumeHugeFileExists(); + try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { + assertTrue("mark is not supported", inputStream.markSupported()); + } + } + + /** + * Validates the implementation of InputStream.mark and reset + * for version 1 of the block blob input stream. + * @throws Exception + */ + @Test + public void test_0303_MarkAndResetV1() throws Exception { + validateMarkAndReset(accountUsingInputStreamV1.getFileSystem()); + } + + /** + * Validates the implementation of InputStream.mark and reset + * for version 2 of the block blob input stream. + * @throws Exception + */ + @Test + public void test_0304_MarkAndResetV2() throws Exception { + validateMarkAndReset(accountUsingInputStreamV2.getFileSystem()); + } + + private void validateMarkAndReset(FileSystem fs) throws Exception { + assumeHugeFileExists(); + try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { + inputStream.mark(KILOBYTE - 1); + + byte[] buffer = new byte[KILOBYTE]; + int bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + + inputStream.reset(); + assertEquals("rest -> pos 0", 0, inputStream.getPos()); + + inputStream.mark(8 * KILOBYTE - 1); + + buffer = new byte[8 * KILOBYTE]; + bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + + intercept(IOException.class, + "Resetting to invalid mark", + new Callable<FSDataInputStream>() { + @Override + public FSDataInputStream call() throws Exception { + inputStream.reset(); + return inputStream; + } + } + ); + } + } + + /** + * Validates the implementation of Seekable.seekToNewSource, which should + * return false for version 1 of the block blob input stream. + * @throws IOException + */ + @Test + public void test_0305_SeekToNewSourceV1() throws IOException { + validateSeekToNewSource(accountUsingInputStreamV1.getFileSystem()); + } + + /** + * Validates the implementation of Seekable.seekToNewSource, which should + * return false for version 2 of the block blob input stream. + * @throws IOException + */ + @Test + public void test_0306_SeekToNewSourceV2() throws IOException { + validateSeekToNewSource(accountUsingInputStreamV2.getFileSystem()); + } + + private void validateSeekToNewSource(FileSystem fs) throws IOException { + assumeHugeFileExists(); + try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { + assertFalse(inputStream.seekToNewSource(0)); + } + } + + /** + * Validates the implementation of InputStream.skip and ensures there is no + * network I/O for version 1 of the block blob input stream. + * @throws Exception + */ + @Test + public void test_0307_SkipBoundsV1() throws Exception { + validateSkipBounds(accountUsingInputStreamV1.getFileSystem()); + } + + /** + * Validates the implementation of InputStream.skip and ensures there is no + * network I/O for version 2 of the block blob input stream. + * @throws Exception + */ + @Test + public void test_0308_SkipBoundsV2() throws Exception { + validateSkipBounds(accountUsingInputStreamV2.getFileSystem()); + } + + private void validateSkipBounds(FileSystem fs) throws Exception { + assumeHugeFileExists(); + try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { + NanoTimer timer = new NanoTimer(); + + long skipped = inputStream.skip(-1); + assertEquals(0, skipped); + + skipped = inputStream.skip(0); + assertEquals(0, skipped); + + assertTrue(testFileLength > 0); + + skipped = inputStream.skip(testFileLength); + assertEquals(testFileLength, skipped); + + intercept(EOFException.class, + new Callable<Long>() { + @Override + public Long call() throws Exception { + return inputStream.skip(1); + } + } + ); + long elapsedTimeMs = timer.elapsedTimeMs(); + assertTrue( + String.format( + "There should not be any network I/O (elapsedTimeMs=%1$d).", + elapsedTimeMs), + elapsedTimeMs < 20); + } + } + + /** + * Validates the implementation of Seekable.seek and ensures there is no + * network I/O for forward seek. + * @throws Exception + */ + @Test + public void test_0309_SeekBoundsV1() throws Exception { + validateSeekBounds(accountUsingInputStreamV1.getFileSystem()); + } + + /** + * Validates the implementation of Seekable.seek and ensures there is no + * network I/O for forward seek. + * @throws Exception + */ + @Test + public void test_0310_SeekBoundsV2() throws Exception { + validateSeekBounds(accountUsingInputStreamV2.getFileSystem()); + } + + private void validateSeekBounds(FileSystem fs) throws Exception { + assumeHugeFileExists(); + try ( + FSDataInputStream inputStream = fs.open(TEST_FILE_PATH); + ) { + NanoTimer timer = new NanoTimer(); + + inputStream.seek(0); + assertEquals(0, inputStream.getPos()); + + intercept(EOFException.class, + FSExceptionMessages.NEGATIVE_SEEK, + new Callable<FSDataInputStream>() { + @Override + public FSDataInputStream call() throws Exception { + inputStream.seek(-1); + return inputStream; + } + } + ); + + assertTrue("Test file length only " + testFileLength, testFileLength > 0); + inputStream.seek(testFileLength); + assertEquals(testFileLength, inputStream.getPos()); + + intercept(EOFException.class, + FSExceptionMessages.CANNOT_SEEK_PAST_EOF, + new Callable<FSDataInputStream>() { + @Override + public FSDataInputStream call() throws Exception { + inputStream.seek(testFileLength + 1); + return inputStream; + } + } + ); + + long elapsedTimeMs = timer.elapsedTimeMs(); + assertTrue( + String.format( + "There should not be any network I/O (elapsedTimeMs=%1$d).", + elapsedTimeMs), + elapsedTimeMs < 20); + } + } + + /** + * Validates the implementation of Seekable.seek, Seekable.getPos, + * and InputStream.available. + * @throws Exception + */ + @Test + public void test_0311_SeekAndAvailableAndPositionV1() throws Exception { + validateSeekAndAvailableAndPosition( + accountUsingInputStreamV1.getFileSystem()); + } + + /** + * Validates the implementation of Seekable.seek, Seekable.getPos, + * and InputStream.available. + * @throws Exception + */ + @Test + public void test_0312_SeekAndAvailableAndPositionV2() throws Exception { + validateSeekAndAvailableAndPosition( + accountUsingInputStreamV2.getFileSystem()); + } + + private void validateSeekAndAvailableAndPosition(FileSystem fs) + throws Exception { + assumeHugeFileExists(); + try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { + byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'}; + byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'}; + byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'}; + byte[] expected4 = {(byte) 'g', (byte) 'h', (byte) 'i'}; + byte[] buffer = new byte[3]; + + int bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + assertArrayEquals(expected1, buffer); + assertEquals(buffer.length, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + + bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + assertArrayEquals(expected2, buffer); + assertEquals(2 * buffer.length, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + + // reverse seek + int seekPos = 0; + inputStream.seek(seekPos); + + bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + assertArrayEquals(expected1, buffer); + assertEquals(buffer.length + seekPos, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + + // reverse seek + seekPos = 1; + inputStream.seek(seekPos); + + bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + assertArrayEquals(expected3, buffer); + assertEquals(buffer.length + seekPos, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + + // forward seek + seekPos = 6; + inputStream.seek(seekPos); + + bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + assertArrayEquals(expected4, buffer); + assertEquals(buffer.length + seekPos, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + } + } + + /** + * Validates the implementation of InputStream.skip, Seekable.getPos, + * and InputStream.available. + * @throws IOException + */ + @Test + public void test_0313_SkipAndAvailableAndPositionV1() throws IOException { + validateSkipAndAvailableAndPosition( + accountUsingInputStreamV1.getFileSystem()); + } + + /** + * Validates the implementation of InputStream.skip, Seekable.getPos, + * and InputStream.available. + * @throws IOException + */ + @Test + public void test_0314_SkipAndAvailableAndPositionV2() throws IOException { + validateSkipAndAvailableAndPosition( + accountUsingInputStreamV1.getFileSystem()); + } + + private void validateSkipAndAvailableAndPosition(FileSystem fs) + throws IOException { + assumeHugeFileExists(); + try ( + FSDataInputStream inputStream = fs.open(TEST_FILE_PATH); + ) { + byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'}; + byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'}; + byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'}; + byte[] expected4 = {(byte) 'g', (byte) 'h', (byte) 'i'}; + + assertEquals(testFileLength, inputStream.available()); + assertEquals(0, inputStream.getPos()); + + int n = 3; + long skipped = inputStream.skip(n); + + assertEquals(skipped, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + assertEquals(skipped, n); + + byte[] buffer = new byte[3]; + int bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + assertArrayEquals(expected2, buffer); + assertEquals(buffer.length + skipped, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + + // does skip still work after seek? + int seekPos = 1; + inputStream.seek(seekPos); + + bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + assertArrayEquals(expected3, buffer); + assertEquals(buffer.length + seekPos, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + + long currentPosition = inputStream.getPos(); + n = 2; + skipped = inputStream.skip(n); + + assertEquals(currentPosition + skipped, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + assertEquals(skipped, n); + + bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + assertArrayEquals(expected4, buffer); + assertEquals(buffer.length + skipped + currentPosition, + inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + } + } + + /** + * Ensures parity in the performance of sequential read for + * version 1 and version 2 of the block blob input stream. + * @throws IOException + */ + @Test + public void test_0315_SequentialReadPerformance() throws IOException { + assumeHugeFileExists(); + final int maxAttempts = 10; + final double maxAcceptableRatio = 1.01; + double v1ElapsedMs = 0, v2ElapsedMs = 0; + double ratio = Double.MAX_VALUE; + for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) { + v1ElapsedMs = sequentialRead(1, + accountUsingInputStreamV1.getFileSystem(), false); + v2ElapsedMs = sequentialRead(2, + accountUsingInputStreamV2.getFileSystem(), false); + ratio = v2ElapsedMs / v1ElapsedMs; + LOG.info(String.format( + "v1ElapsedMs=%1$d, v2ElapsedMs=%2$d, ratio=%3$.2f", + (long) v1ElapsedMs, + (long) v2ElapsedMs, + ratio)); + } + assertTrue(String.format( + "Performance of version 2 is not acceptable: v1ElapsedMs=%1$d," + + " v2ElapsedMs=%2$d, ratio=%3$.2f", + (long) v1ElapsedMs, + (long) v2ElapsedMs, + ratio), + ratio < maxAcceptableRatio); + } + + /** + * Ensures parity in the performance of sequential read after reverse seek for + * version 2 of the block blob input stream. + * @throws IOException + */ + @Test + public void test_0316_SequentialReadAfterReverseSeekPerformanceV2() + throws IOException { + assumeHugeFileExists(); + final int maxAttempts = 10; + final double maxAcceptableRatio = 1.01; + double beforeSeekElapsedMs = 0, afterSeekElapsedMs = 0; + double ratio = Double.MAX_VALUE; + for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) { + beforeSeekElapsedMs = sequentialRead(2, + accountUsingInputStreamV2.getFileSystem(), false); + afterSeekElapsedMs = sequentialRead(2, + accountUsingInputStreamV2.getFileSystem(), true); + ratio = afterSeekElapsedMs / beforeSeekElapsedMs; + LOG.info(String.format( + "beforeSeekElapsedMs=%1$d, afterSeekElapsedMs=%2$d, ratio=%3$.2f", + (long) beforeSeekElapsedMs, + (long) afterSeekElapsedMs, + ratio)); + } + assertTrue(String.format( + "Performance of version 2 after reverse seek is not acceptable:" + + " beforeSeekElapsedMs=%1$d, afterSeekElapsedMs=%2$d," + + " ratio=%3$.2f", + (long) beforeSeekElapsedMs, + (long) afterSeekElapsedMs, + ratio), + ratio < maxAcceptableRatio); + } + + private long sequentialRead(int version, + FileSystem fs, + boolean afterReverseSeek) throws IOException { + byte[] buffer = new byte[16 * KILOBYTE]; + long totalBytesRead = 0; + long bytesRead = 0; + + try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { + if (afterReverseSeek) { + while (bytesRead > 0 && totalBytesRead < 4 * MEGABYTE) { + bytesRead = inputStream.read(buffer); + totalBytesRead += bytesRead; + } + totalBytesRead = 0; + inputStream.seek(0); + } + + NanoTimer timer = new NanoTimer(); + while ((bytesRead = inputStream.read(buffer)) > 0) { + totalBytesRead += bytesRead; + } + long elapsedTimeMs = timer.elapsedTimeMs(); + + LOG.info(String.format( + "v%1$d: bytesRead=%2$d, elapsedMs=%3$d, Mbps=%4$.2f," + + " afterReverseSeek=%5$s", + version, + totalBytesRead, + elapsedTimeMs, + toMbps(totalBytesRead, elapsedTimeMs), + afterReverseSeek)); + + assertEquals(testFileLength, totalBytesRead); + inputStream.close(); + return elapsedTimeMs; + } + } + + @Test + public void test_0317_RandomReadPerformance() throws IOException { + assumeHugeFileExists(); + final int maxAttempts = 10; + final double maxAcceptableRatio = 0.10; + double v1ElapsedMs = 0, v2ElapsedMs = 0; + double ratio = Double.MAX_VALUE; + for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) { + v1ElapsedMs = randomRead(1, + accountUsingInputStreamV1.getFileSystem()); + v2ElapsedMs = randomRead(2, + accountUsingInputStreamV2.getFileSystem()); + ratio = v2ElapsedMs / v1ElapsedMs; + LOG.info(String.format( + "v1ElapsedMs=%1$d, v2ElapsedMs=%2$d, ratio=%3$.2f", + (long) v1ElapsedMs, + (long) v2ElapsedMs, + ratio)); + } + assertTrue(String.format( + "Performance of version 2 is not acceptable: v1ElapsedMs=%1$d," + + " v2ElapsedMs=%2$d, ratio=%3$.2f", + (long) v1ElapsedMs, + (long) v2ElapsedMs, + ratio), + ratio < maxAcceptableRatio); + } + + private long randomRead(int version, FileSystem fs) throws IOException { + assumeHugeFileExists(); + final int minBytesToRead = 2 * MEGABYTE; + Random random = new Random(); + byte[] buffer = new byte[8 * KILOBYTE]; + long totalBytesRead = 0; + long bytesRead = 0; + try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { + NanoTimer timer = new NanoTimer(); + + do { + bytesRead = inputStream.read(buffer); + totalBytesRead += bytesRead; + inputStream.seek(random.nextInt( + (int) (testFileLength - buffer.length))); + } while (bytesRead > 0 && totalBytesRead < minBytesToRead); + + long elapsedTimeMs = timer.elapsedTimeMs(); + + inputStream.close(); + + LOG.info(String.format( + "v%1$d: totalBytesRead=%2$d, elapsedTimeMs=%3$d, Mbps=%4$.2f", + version, + totalBytesRead, + elapsedTimeMs, + toMbps(totalBytesRead, elapsedTimeMs))); + + assertTrue(minBytesToRead <= totalBytesRead); + + return elapsedTimeMs; + } + } + + @Test + public void test_999_DeleteHugeFiles() throws IOException { + try { + NanoTimer timer = new NanoTimer(); + NativeAzureFileSystem fs = getFileSystem(); + fs.delete(TEST_FILE_PATH, false); + timer.end("time to delete %s", TEST_FILE_PATH); + } finally { + // clean up the test account + AzureTestUtils.cleanupTestAccount(accountUsingInputStreamV1); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestContainerChecks.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestContainerChecks.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestContainerChecks.java new file mode 100644 index 0000000..cc3baf5 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestContainerChecks.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import static org.junit.Assume.assumeNotNull; + +import java.io.FileNotFoundException; +import java.util.EnumSet; +import java.util.concurrent.Callable; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount.CreateOptions; +import org.apache.hadoop.fs.azure.integration.AzureTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; + +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; + +import com.microsoft.azure.storage.blob.BlobOutputStream; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.microsoft.azure.storage.blob.CloudBlockBlob; + +/** + * Tests that WASB creates containers only if needed. + */ +public class ITestContainerChecks extends AbstractWasbTestWithTimeout { + private AzureBlobStorageTestAccount testAccount; + private boolean runningInSASMode = false; + + @After + public void tearDown() throws Exception { + testAccount = AzureTestUtils.cleanup(testAccount); + } + + @Before + public void setMode() { + runningInSASMode = AzureBlobStorageTestAccount.createTestConfiguration(). + getBoolean(AzureNativeFileSystemStore.KEY_USE_SECURE_MODE, false); + } + + @Test + public void testContainerExistAfterDoesNotExist() throws Exception { + testAccount = blobStorageTestAccount(); + assumeNotNull(testAccount); + CloudBlobContainer container = testAccount.getRealContainer(); + FileSystem fs = testAccount.getFileSystem(); + + // Starting off with the container not there + assertFalse(container.exists()); + + // A list shouldn't create the container and will set file system store + // state to DoesNotExist + try { + fs.listStatus(new Path("/")); + assertTrue("Should've thrown.", false); + } catch (FileNotFoundException ex) { + assertTrue("Unexpected exception: " + ex, + ex.getMessage().contains("does not exist.")); + } + assertFalse(container.exists()); + + // Create a container outside of the WASB FileSystem + container.create(); + // Add a file to the container outside of the WASB FileSystem + CloudBlockBlob blob = testAccount.getBlobReference("foo"); + BlobOutputStream outputStream = blob.openOutputStream(); + outputStream.write(new byte[10]); + outputStream.close(); + + // Make sure the file is visible + assertTrue(fs.exists(new Path("/foo"))); + assertTrue(container.exists()); + } + + protected AzureBlobStorageTestAccount blobStorageTestAccount() + throws Exception { + return AzureBlobStorageTestAccount.create("", + EnumSet.noneOf(CreateOptions.class)); + } + + @Test + public void testContainerCreateAfterDoesNotExist() throws Exception { + testAccount = blobStorageTestAccount(); + assumeNotNull(testAccount); + CloudBlobContainer container = testAccount.getRealContainer(); + FileSystem fs = testAccount.getFileSystem(); + + // Starting off with the container not there + assertFalse(container.exists()); + + // A list shouldn't create the container and will set file system store + // state to DoesNotExist + try { + assertNull(fs.listStatus(new Path("/"))); + assertTrue("Should've thrown.", false); + } catch (FileNotFoundException ex) { + assertTrue("Unexpected exception: " + ex, + ex.getMessage().contains("does not exist.")); + } + assertFalse(container.exists()); + + // Create a container outside of the WASB FileSystem + container.create(); + + // Write should succeed + assertTrue(fs.createNewFile(new Path("/foo"))); + assertTrue(container.exists()); + } + + @Test + public void testContainerCreateOnWrite() throws Exception { + testAccount = blobStorageTestAccount(); + assumeNotNull(testAccount); + CloudBlobContainer container = testAccount.getRealContainer(); + FileSystem fs = testAccount.getFileSystem(); + + // Starting off with the container not there + assertFalse(container.exists()); + + // A list shouldn't create the container. + try { + fs.listStatus(new Path("/")); + assertTrue("Should've thrown.", false); + } catch (FileNotFoundException ex) { + assertTrue("Unexpected exception: " + ex, + ex.getMessage().contains("does not exist.")); + } + assertFalse(container.exists()); + + // Neither should a read. + Path foo = new Path("/testContainerCreateOnWrite-foo"); + Path bar = new Path("/testContainerCreateOnWrite-bar"); + LambdaTestUtils.intercept(FileNotFoundException.class, + new Callable<String>() { + @Override + public String call() throws Exception { + fs.open(foo).close(); + return "Stream to " + foo; + } + } + ); + assertFalse(container.exists()); + + // Neither should a rename + assertFalse(fs.rename(foo, bar)); + assertFalse(container.exists()); + + // But a write should. + assertTrue(fs.createNewFile(foo)); + assertTrue(container.exists()); + } + + @Test + public void testContainerChecksWithSas() throws Exception { + + Assume.assumeFalse(runningInSASMode); + testAccount = AzureBlobStorageTestAccount.create("", + EnumSet.of(CreateOptions.UseSas)); + assumeNotNull(testAccount); + CloudBlobContainer container = testAccount.getRealContainer(); + FileSystem fs = testAccount.getFileSystem(); + + // The container shouldn't be there + assertFalse(container.exists()); + + // A write should just fail + try { + fs.createNewFile(new Path("/testContainerChecksWithSas-foo")); + assertFalse("Should've thrown.", true); + } catch (AzureException ex) { + } + assertFalse(container.exists()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionHandling.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionHandling.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionHandling.java new file mode 100644 index 0000000..a45dae4 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionHandling.java @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.io.FileNotFoundException; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.junit.After; +import org.junit.Test; + +import static org.apache.hadoop.fs.azure.ExceptionHandlingTestHelper.*; + +/** + * Single threaded exception handling. + */ +public class ITestFileSystemOperationExceptionHandling + extends AbstractWasbTestBase { + + private FSDataInputStream inputStream = null; + + private Path testPath; + private Path testFolderPath; + + @Override + public void setUp() throws Exception { + super.setUp(); + testPath = path("testfile.dat"); + testFolderPath = path("testfolder"); + } + + /** + * Helper method that creates a InputStream to validate exceptions + * for various scenarios. + */ + private void setupInputStreamToTest(AzureBlobStorageTestAccount testAccount) + throws Exception { + + FileSystem fs = testAccount.getFileSystem(); + + // Step 1: Create a file and write dummy data. + Path base = methodPath(); + Path testFilePath1 = new Path(base, "test1.dat"); + Path testFilePath2 = new Path(base, "test2.dat"); + FSDataOutputStream outputStream = fs.create(testFilePath1); + String testString = "This is a test string"; + outputStream.write(testString.getBytes()); + outputStream.close(); + + // Step 2: Open a read stream on the file. + inputStream = fs.open(testFilePath1); + + // Step 3: Rename the file + fs.rename(testFilePath1, testFilePath2); + } + + /** + * Tests a basic single threaded read scenario for Page blobs. + */ + @Test(expected=FileNotFoundException.class) + public void testSingleThreadedPageBlobReadScenario() throws Throwable { + AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount(); + setupInputStreamToTest(testAccount); + byte[] readBuffer = new byte[512]; + inputStream.read(readBuffer); + } + + /** + * Tests a basic single threaded seek scenario for Page blobs. + */ + @Test(expected=FileNotFoundException.class) + public void testSingleThreadedPageBlobSeekScenario() throws Throwable { + AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount(); + setupInputStreamToTest(testAccount); + inputStream.seek(5); + } + + /** + * Test a basic single thread seek scenario for Block blobs. + */ + @Test(expected=FileNotFoundException.class) + public void testSingleThreadBlockBlobSeekScenario() throws Throwable { + + AzureBlobStorageTestAccount testAccount = createTestAccount(); + setupInputStreamToTest(testAccount); + inputStream.seek(5); + inputStream.read(); + } + + /** + * Tests a basic single threaded read scenario for Block blobs. + */ + @Test(expected=FileNotFoundException.class) + public void testSingledThreadBlockBlobReadScenario() throws Throwable{ + AzureBlobStorageTestAccount testAccount = createTestAccount(); + setupInputStreamToTest(testAccount); + byte[] readBuffer = new byte[512]; + inputStream.read(readBuffer); + } + + /** + * Tests basic single threaded setPermission scenario. + */ + @Test(expected = FileNotFoundException.class) + public void testSingleThreadedBlockBlobSetPermissionScenario() throws Throwable { + + createEmptyFile(createTestAccount(), testPath); + fs.delete(testPath, true); + fs.setPermission(testPath, + new FsPermission(FsAction.EXECUTE, FsAction.READ, FsAction.READ)); + } + + /** + * Tests basic single threaded setPermission scenario. + */ + @Test(expected = FileNotFoundException.class) + public void testSingleThreadedPageBlobSetPermissionScenario() + throws Throwable { + createEmptyFile(getPageBlobTestStorageAccount(), testPath); + fs.delete(testPath, true); + fs.setOwner(testPath, "testowner", "testgroup"); + } + + /** + * Tests basic single threaded setPermission scenario. + */ + @Test(expected = FileNotFoundException.class) + public void testSingleThreadedBlockBlobSetOwnerScenario() throws Throwable { + + createEmptyFile(createTestAccount(), testPath); + fs.delete(testPath, true); + fs.setOwner(testPath, "testowner", "testgroup"); + } + + /** + * Tests basic single threaded setPermission scenario. + */ + @Test(expected = FileNotFoundException.class) + public void testSingleThreadedPageBlobSetOwnerScenario() throws Throwable { + createEmptyFile(getPageBlobTestStorageAccount(), + testPath); + fs.delete(testPath, true); + fs.setPermission(testPath, + new FsPermission(FsAction.EXECUTE, FsAction.READ, FsAction.READ)); + } + + /** + * Test basic single threaded listStatus scenario. + */ + @Test(expected = FileNotFoundException.class) + public void testSingleThreadedBlockBlobListStatusScenario() throws Throwable { + createTestFolder(createTestAccount(), + testFolderPath); + fs.delete(testFolderPath, true); + fs.listStatus(testFolderPath); + } + + /** + * Test basic single threaded listStatus scenario. + */ + @Test(expected = FileNotFoundException.class) + public void testSingleThreadedPageBlobListStatusScenario() throws Throwable { + createTestFolder(getPageBlobTestStorageAccount(), + testFolderPath); + fs.delete(testFolderPath, true); + fs.listStatus(testFolderPath); + } + + /** + * Test basic single threaded listStatus scenario. + */ + @Test + public void testSingleThreadedBlockBlobRenameScenario() throws Throwable { + + createEmptyFile(createTestAccount(), + testPath); + Path dstPath = new Path("dstFile.dat"); + fs.delete(testPath, true); + boolean renameResult = fs.rename(testPath, dstPath); + assertFalse(renameResult); + } + + /** + * Test basic single threaded listStatus scenario. + */ + @Test + public void testSingleThreadedPageBlobRenameScenario() throws Throwable { + + createEmptyFile(getPageBlobTestStorageAccount(), + testPath); + Path dstPath = new Path("dstFile.dat"); + fs.delete(testPath, true); + boolean renameResult = fs.rename(testPath, dstPath); + assertFalse(renameResult); + } + + /** + * Test basic single threaded listStatus scenario. + */ + @Test + public void testSingleThreadedBlockBlobDeleteScenario() throws Throwable { + + createEmptyFile(createTestAccount(), + testPath); + fs.delete(testPath, true); + boolean deleteResult = fs.delete(testPath, true); + assertFalse(deleteResult); + } + + /** + * Test basic single threaded listStatus scenario. + */ + @Test + public void testSingleThreadedPageBlobDeleteScenario() throws Throwable { + + createEmptyFile(getPageBlobTestStorageAccount(), + testPath); + fs.delete(testPath, true); + boolean deleteResult = fs.delete(testPath, true); + assertFalse(deleteResult); + } + + /** + * Test basic single threaded listStatus scenario. + */ + @Test(expected = FileNotFoundException.class) + public void testSingleThreadedBlockBlobOpenScenario() throws Throwable { + + createEmptyFile(createTestAccount(), + testPath); + fs.delete(testPath, true); + inputStream = fs.open(testPath); + } + + /** + * Test delete then open a file. + */ + @Test(expected = FileNotFoundException.class) + public void testSingleThreadedPageBlobOpenScenario() throws Throwable { + + createEmptyFile(getPageBlobTestStorageAccount(), + testPath); + fs.delete(testPath, true); + inputStream = fs.open(testPath); + } + + @After + public void tearDown() throws Exception { + if (inputStream != null) { + inputStream.close(); + } + + ContractTestUtils.rm(fs, testPath, true, true); + super.tearDown(); + } + + @Override + protected AzureBlobStorageTestAccount createTestAccount() + throws Exception { + return AzureBlobStorageTestAccount.create(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionMessage.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionMessage.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionMessage.java new file mode 100644 index 0000000..6d5e72e --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionMessage.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.net.URI; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azure.integration.AzureTestUtils; +import org.apache.hadoop.test.GenericTestUtils; + +import com.microsoft.azure.storage.CloudStorageAccount; +import org.junit.Test; + +import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.NO_ACCESS_TO_CONTAINER_MSG; + +/** + * Test for error messages coming from SDK. + */ +public class ITestFileSystemOperationExceptionMessage + extends AbstractWasbTestWithTimeout { + + + + @Test + public void testAnonymouseCredentialExceptionMessage() throws Throwable { + + Configuration conf = AzureBlobStorageTestAccount.createTestConfiguration(); + CloudStorageAccount account = + AzureBlobStorageTestAccount.createTestAccount(conf); + AzureTestUtils.assume("No test account", account != null); + + String testStorageAccount = conf.get("fs.azure.test.account.name"); + conf = new Configuration(); + conf.set("fs.AbstractFileSystem.wasb.impl", + "org.apache.hadoop.fs.azure.Wasb"); + conf.set("fs.azure.skip.metrics", "true"); + + String testContainer = UUID.randomUUID().toString(); + String wasbUri = String.format("wasb://%s@%s", + testContainer, testStorageAccount); + + try(NativeAzureFileSystem filesystem = new NativeAzureFileSystem()) { + filesystem.initialize(new URI(wasbUri), conf); + fail("Expected an exception, got " + filesystem); + } catch (Exception ex) { + + Throwable innerException = ex.getCause(); + while (innerException != null + && !(innerException instanceof AzureException)) { + innerException = innerException.getCause(); + } + + if (innerException != null) { + GenericTestUtils.assertExceptionContains(String.format( + NO_ACCESS_TO_CONTAINER_MSG, testStorageAccount, testContainer), + ex); + } else { + fail("No inner azure exception"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationsExceptionHandlingMultiThreaded.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationsExceptionHandlingMultiThreaded.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationsExceptionHandlingMultiThreaded.java new file mode 100644 index 0000000..175a9ec --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationsExceptionHandlingMultiThreaded.java @@ -0,0 +1,366 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.io.FileNotFoundException; + +import org.junit.Test; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; + +import static org.apache.hadoop.fs.azure.ExceptionHandlingTestHelper.*; + +/** + * Multithreaded operations on FS, verify failures are as expected. + */ +public class ITestFileSystemOperationsExceptionHandlingMultiThreaded + extends AbstractWasbTestBase { + + FSDataInputStream inputStream = null; + + private Path testPath; + private Path testFolderPath; + + @Override + public void setUp() throws Exception { + super.setUp(); + testPath = path("testfile.dat"); + testFolderPath = path("testfolder"); + } + + @Override + protected AzureBlobStorageTestAccount createTestAccount() throws Exception { + return AzureBlobStorageTestAccount.create(); + } + + @Override + public void tearDown() throws Exception { + + IOUtils.closeStream(inputStream); + ContractTestUtils.rm(fs, testPath, true, false); + ContractTestUtils.rm(fs, testFolderPath, true, false); + super.tearDown(); + } + + /** + * Helper method to creates an input stream to test various scenarios. + */ + private void getInputStreamToTest(FileSystem fs, Path testPath) + throws Throwable { + + FSDataOutputStream outputStream = fs.create(testPath); + String testString = "This is a test string"; + outputStream.write(testString.getBytes()); + outputStream.close(); + + inputStream = fs.open(testPath); + } + + /** + * Test to validate correct exception is thrown for Multithreaded read + * scenario for block blobs. + */ + @Test(expected = FileNotFoundException.class) + public void testMultiThreadedBlockBlobReadScenario() throws Throwable { + + AzureBlobStorageTestAccount testAccount = createTestAccount(); + NativeAzureFileSystem fs = testAccount.getFileSystem(); + Path base = methodPath(); + Path testFilePath1 = new Path(base, "test1.dat"); + Path renamePath = new Path(base, "test2.dat"); + getInputStreamToTest(fs, testFilePath1); + Thread renameThread = new Thread( + new RenameThread(fs, testFilePath1, renamePath)); + renameThread.start(); + + renameThread.join(); + + byte[] readBuffer = new byte[512]; + inputStream.read(readBuffer); + } + + /** + * Test to validate correct exception is thrown for Multithreaded seek + * scenario for block blobs. + */ + @Test(expected = FileNotFoundException.class) + public void testMultiThreadBlockBlobSeekScenario() throws Throwable { + +/* + AzureBlobStorageTestAccount testAccount = createTestAccount(); + fs = testAccount.getFileSystem(); +*/ + Path base = methodPath(); + Path testFilePath1 = new Path(base, "test1.dat"); + Path renamePath = new Path(base, "test2.dat"); + + getInputStreamToTest(fs, testFilePath1); + Thread renameThread = new Thread( + new RenameThread(fs, testFilePath1, renamePath)); + renameThread.start(); + + renameThread.join(); + + inputStream.seek(5); + inputStream.read(); + } + + /** + * Tests basic multi threaded setPermission scenario. + */ + @Test(expected = FileNotFoundException.class) + public void testMultiThreadedPageBlobSetPermissionScenario() + throws Throwable { + createEmptyFile( + getPageBlobTestStorageAccount(), + testPath); + Thread t = new Thread(new DeleteThread(fs, testPath)); + t.start(); + while (t.isAlive()) { + fs.setPermission(testPath, + new FsPermission(FsAction.EXECUTE, FsAction.READ, FsAction.READ)); + } + fs.setPermission(testPath, + new FsPermission(FsAction.EXECUTE, FsAction.READ, FsAction.READ)); + } + + /** + * Tests basic multi threaded setPermission scenario. + */ + @Test(expected = FileNotFoundException.class) + public void testMultiThreadedBlockBlobSetPermissionScenario() + throws Throwable { + createEmptyFile(createTestAccount(), + testPath); + Thread t = new Thread(new DeleteThread(fs, testPath)); + t.start(); + while (t.isAlive()) { + fs.setPermission(testPath, + new FsPermission(FsAction.EXECUTE, FsAction.READ, FsAction.READ)); + } + fs.setPermission(testPath, + new FsPermission(FsAction.EXECUTE, FsAction.READ, FsAction.READ)); + } + + /** + * Tests basic multi threaded setPermission scenario. + */ + @Test(expected = FileNotFoundException.class) + public void testMultiThreadedPageBlobOpenScenario() throws Throwable { + + createEmptyFile(createTestAccount(), + testPath); + Thread t = new Thread(new DeleteThread(fs, testPath)); + t.start(); + while (t.isAlive()) { + inputStream = fs.open(testPath); + inputStream.close(); + } + + inputStream = fs.open(testPath); + inputStream.close(); + } + + /** + * Tests basic multi threaded setPermission scenario. + */ + @Test(expected = FileNotFoundException.class) + public void testMultiThreadedBlockBlobOpenScenario() throws Throwable { + + createEmptyFile( + getPageBlobTestStorageAccount(), + testPath); + Thread t = new Thread(new DeleteThread(fs, testPath)); + t.start(); + + while (t.isAlive()) { + inputStream = fs.open(testPath); + inputStream.close(); + } + inputStream = fs.open(testPath); + inputStream.close(); + } + + /** + * Tests basic multi threaded setOwner scenario. + */ + @Test(expected = FileNotFoundException.class) + public void testMultiThreadedBlockBlobSetOwnerScenario() throws Throwable { + + createEmptyFile(createTestAccount(), testPath); + Thread t = new Thread(new DeleteThread(fs, testPath)); + t.start(); + while (t.isAlive()) { + fs.setOwner(testPath, "testowner", "testgroup"); + } + fs.setOwner(testPath, "testowner", "testgroup"); + } + + /** + * Tests basic multi threaded setOwner scenario. + */ + @Test(expected = FileNotFoundException.class) + public void testMultiThreadedPageBlobSetOwnerScenario() throws Throwable { + createEmptyFile( + getPageBlobTestStorageAccount(), + testPath); + Thread t = new Thread(new DeleteThread(fs, testPath)); + t.start(); + while (t.isAlive()) { + fs.setOwner(testPath, "testowner", "testgroup"); + } + fs.setOwner(testPath, "testowner", "testgroup"); + } + + /** + * Tests basic multi threaded listStatus scenario. + */ + @Test(expected = FileNotFoundException.class) + public void testMultiThreadedBlockBlobListStatusScenario() throws Throwable { + + createTestFolder(createTestAccount(), + testFolderPath); + Thread t = new Thread(new DeleteThread(fs, testFolderPath)); + t.start(); + while (t.isAlive()) { + fs.listStatus(testFolderPath); + } + fs.listStatus(testFolderPath); + } + + /** + * Tests basic multi threaded listStatus scenario. + */ + @Test(expected = FileNotFoundException.class) + public void testMultiThreadedPageBlobListStatusScenario() throws Throwable { + + createTestFolder( + getPageBlobTestStorageAccount(), + testFolderPath); + Thread t = new Thread(new DeleteThread(fs, testFolderPath)); + t.start(); + while (t.isAlive()) { + fs.listStatus(testFolderPath); + } + fs.listStatus(testFolderPath); + } + + /** + * Test to validate correct exception is thrown for Multithreaded read + * scenario for page blobs. + */ + @Test(expected = FileNotFoundException.class) + public void testMultiThreadedPageBlobReadScenario() throws Throwable { + + bindToTestAccount(getPageBlobTestStorageAccount()); + Path base = methodPath(); + Path testFilePath1 = new Path(base, "test1.dat"); + Path renamePath = new Path(base, "test2.dat"); + + getInputStreamToTest(fs, testFilePath1); + Thread renameThread = new Thread( + new RenameThread(fs, testFilePath1, renamePath)); + renameThread.start(); + + renameThread.join(); + byte[] readBuffer = new byte[512]; + inputStream.read(readBuffer); + } + + /** + * Test to validate correct exception is thrown for Multithreaded seek + * scenario for page blobs. + */ + + @Test(expected = FileNotFoundException.class) + public void testMultiThreadedPageBlobSeekScenario() throws Throwable { + + bindToTestAccount(getPageBlobTestStorageAccount()); + + Path base = methodPath(); + Path testFilePath1 = new Path(base, "test1.dat"); + Path renamePath = new Path(base, "test2.dat"); + + getInputStreamToTest(fs, testFilePath1); + Thread renameThread = new Thread( + new RenameThread(fs, testFilePath1, renamePath)); + renameThread.start(); + + renameThread.join(); + inputStream.seek(5); + } + + + /** + * Helper thread that just renames the test file. + */ + private static class RenameThread implements Runnable { + + private final FileSystem fs; + private final Path testPath; + private final Path renamePath; + + RenameThread(FileSystem fs, + Path testPath, + Path renamePath) { + this.fs = fs; + this.testPath = testPath; + this.renamePath = renamePath; + } + + @Override + public void run() { + try { + fs.rename(testPath, renamePath); + } catch (Exception e) { + // Swallowing the exception as the + // correctness of the test is controlled + // by the other thread + } + } + } + + private static class DeleteThread implements Runnable { + private final FileSystem fs; + private final Path testPath; + + DeleteThread(FileSystem fs, Path testPath) { + this.fs = fs; + this.testPath = testPath; + } + + @Override + public void run() { + try { + fs.delete(testPath, true); + } catch (Exception e) { + // Swallowing the exception as the + // correctness of the test is controlled + // by the other thread + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org