HADOOP-14520. WASB: Block compaction for Azure Block Blobs. Contributed by Georgi Chalakov
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/13eda500 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/13eda500 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/13eda500 Branch: refs/heads/HDFS-10467 Commit: 13eda5000304099d1145631f9be13ce8a00b600d Parents: d77ed23 Author: Steve Loughran <ste...@apache.org> Authored: Thu Sep 7 18:35:03 2017 +0100 Committer: Steve Loughran <ste...@apache.org> Committed: Thu Sep 7 18:35:03 2017 +0100 ---------------------------------------------------------------------- .../fs/azure/AzureNativeFileSystemStore.java | 73 +- .../hadoop/fs/azure/BlockBlobAppendStream.java | 1301 +++++++++++------- .../hadoop/fs/azure/NativeAzureFileSystem.java | 77 +- .../hadoop/fs/azure/NativeFileSystemStore.java | 5 +- .../fs/azure/SecureStorageInterfaceImpl.java | 10 +- .../hadoop/fs/azure/SelfRenewingLease.java | 10 +- .../hadoop/fs/azure/StorageInterface.java | 3 +- .../hadoop/fs/azure/StorageInterfaceImpl.java | 12 +- .../fs/azure/SyncableDataOutputStream.java | 11 + .../hadoop-azure/src/site/markdown/index.md | 34 + .../hadoop/fs/azure/MockStorageInterface.java | 3 +- .../azure/TestAzureConcurrentOutOfBandIo.java | 6 +- ...estNativeAzureFileSystemBlockCompaction.java | 266 ++++ .../src/test/resources/log4j.properties | 1 + 14 files changed, 1273 insertions(+), 539 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java index bd8ac68..639862f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java @@ -203,6 +203,23 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { private Set<String> pageBlobDirs; /** + * Configuration key to indicate the set of directories in WASB where we + * should store files as block blobs with block compaction enabled. + * + * Entries can be directory paths relative to the container (e.g. "/path") or + * fully qualified wasb:// URIs (e.g. + * wasb://contai...@example.blob.core.windows.net/path) + */ + public static final String KEY_BLOCK_BLOB_WITH_COMPACTION_DIRECTORIES = + "fs.azure.block.blob.with.compaction.dir"; + + /** + * The set of directories where we should store files as block blobs with + * block compaction enabled. + */ + private Set<String> blockBlobWithCompationDirs; + + /** * Configuration key to indicate the set of directories in WASB where * we should do atomic folder rename synchronized with createNonRecursive. */ @@ -527,6 +544,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { // User-agent userAgentId = conf.get(USER_AGENT_ID_KEY, USER_AGENT_ID_DEFAULT); + // Extract the directories that should contain block blobs with compaction + blockBlobWithCompationDirs = getDirectorySet( + KEY_BLOCK_BLOB_WITH_COMPACTION_DIRECTORIES); + LOG.debug("Block blobs with compaction directories: {}", + setToString(blockBlobWithCompationDirs)); + // Extract directories that should have atomic rename applied. atomicRenameDirs = getDirectorySet(KEY_ATOMIC_RENAME_DIRECTORIES); String hbaseRoot; @@ -1165,6 +1188,17 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { } /** + * Checks if the given key in Azure Storage should be stored as a block blobs + * with compaction enabled instead of normal block blob. + * + * @param key blob name + * @return true, if the file is in directory with block compaction enabled. + */ + public boolean isBlockBlobWithCompactionKey(String key) { + return isKeyForDirectorySet(key, blockBlobWithCompationDirs); + } + + /** * Checks if the given key in Azure storage should have synchronized * atomic folder rename createNonRecursive implemented. */ @@ -1356,7 +1390,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { } @Override - public DataOutputStream storefile(String key, PermissionStatus permissionStatus) + public DataOutputStream storefile(String keyEncoded, + PermissionStatus permissionStatus, + String key) throws AzureException { try { @@ -1417,12 +1453,26 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { // Get the blob reference from the store's container and // return it. - CloudBlobWrapper blob = getBlobReference(key); + CloudBlobWrapper blob = getBlobReference(keyEncoded); storePermissionStatus(blob, permissionStatus); // Create the output stream for the Azure blob. // - OutputStream outputStream = openOutputStream(blob); + OutputStream outputStream; + + if (isBlockBlobWithCompactionKey(key)) { + BlockBlobAppendStream blockBlobOutputStream = new BlockBlobAppendStream( + (CloudBlockBlobWrapper) blob, + keyEncoded, + this.uploadBlockSizeBytes, + true, + getInstrumentedContext()); + + outputStream = blockBlobOutputStream; + } else { + outputStream = openOutputStream(blob); + } + DataOutputStream dataOutStream = new SyncableDataOutputStream(outputStream); return dataOutStream; } catch (Exception e) { @@ -2869,10 +2919,21 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { CloudBlobWrapper blob = this.container.getBlockBlobReference(key); - BlockBlobAppendStream appendStream = new BlockBlobAppendStream((CloudBlockBlobWrapper) blob, key, bufferSize, getInstrumentedContext()); - appendStream.initialize(); + OutputStream outputStream; + + BlockBlobAppendStream blockBlobOutputStream = new BlockBlobAppendStream( + (CloudBlockBlobWrapper) blob, + key, + bufferSize, + isBlockBlobWithCompactionKey(key), + getInstrumentedContext()); + + outputStream = blockBlobOutputStream; + + DataOutputStream dataOutStream = new SyncableDataOutputStream( + outputStream); - return new DataOutputStream(appendStream); + return dataOutStream; } catch(Exception ex) { throw new AzureException(ex); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java index afb9379..84342cd 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java @@ -22,122 +22,256 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Calendar; -import java.util.HashMap; -import java.util.Locale; +import java.util.Iterator; import java.util.List; import java.util.UUID; import java.util.Random; -import java.util.TimeZone; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper; -import org.eclipse.jetty.util.log.Log; +import org.apache.hadoop.io.ElasticByteBufferPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.microsoft.azure.storage.AccessCondition; import com.microsoft.azure.storage.OperationContext; import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.StorageErrorCodeStrings; import com.microsoft.azure.storage.blob.BlobRequestOptions; import com.microsoft.azure.storage.blob.BlockEntry; import com.microsoft.azure.storage.blob.BlockListingFilter; +import com.microsoft.azure.storage.blob.BlockSearchMode; + +import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HFLUSH; +import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HSYNC; /** - * Stream object that implememnts append for Block Blobs in WASB. + * Stream object that implements append for Block Blobs in WASB. + * + * The stream object implements hflush/hsync and block compaction. Block + * compaction is the process of replacing a sequence of small blocks with one + * big block. Azure Block blobs supports up to 50000 blocks and every + * hflush/hsync generates one block. When the number of blocks is above 32000, + * the process of compaction decreases the total number of blocks, if possible. + * If compaction is disabled, hflush/hsync are empty functions. + * + * The stream object uses background threads for uploading the blocks and the + * block blob list. Blocks can be uploaded concurrently. However, when the block + * list is uploaded, block uploading should stop. If a block is uploaded before + * the block list and the block id is not in the list, the block will be lost. + * If the block is uploaded after the block list and the block id is in the + * list, the block list upload will fail. The exclusive access for the block + * list upload is managed by uploadingSemaphore. */ -public class BlockBlobAppendStream extends OutputStream { +public class BlockBlobAppendStream extends OutputStream implements Syncable, + StreamCapabilities { + + /** + * The name of the blob/file. + */ private final String key; - private final int bufferSize; - private ByteArrayOutputStream outBuffer; - private final CloudBlockBlobWrapper blob; - private final OperationContext opContext; /** - * Variable to track if the stream has been closed. + * This variable tracks if this is new blob or existing one. + */ + private boolean blobExist; + + /** + * When the blob exist, to to prevent concurrent write we take a lease. + * Taking a lease is not necessary for new blobs. */ - private boolean closed = false; + private SelfRenewingLease lease = null; /** - * Variable to track if the append lease is released. + * The support for process of compaction is optional. */ + private final boolean compactionEnabled; - private volatile boolean leaseFreed; + /** + * The number of blocks above each block compaction is triggered. + */ + private static final int DEFAULT_ACTIVATE_COMPACTION_BLOCK_COUNT = 32000; /** - * Variable to track if the append stream has been - * initialized. + * The number of blocks above each block compaction is triggered. */ + private int activateCompactionBlockCount + = DEFAULT_ACTIVATE_COMPACTION_BLOCK_COUNT; - private boolean initialized = false; + /** + * The size of the output buffer. Writes store the data in outBuffer until + * either the size is above maxBlockSize or hflush/hsync is called. + */ + private final AtomicInteger maxBlockSize; /** - * Last IOException encountered + * The current buffer where writes are stored. */ - private volatile IOException lastError = null; + private ByteBuffer outBuffer; /** - * List to keep track of the uncommitted azure storage - * block ids + * The size of the blob that has been successfully stored in the Azure Blob + * service. */ - private final List<BlockEntry> uncommittedBlockEntries; + private final AtomicLong committedBlobLength = new AtomicLong(0); - private static final int UNSET_BLOCKS_COUNT = -1; + /** + * Position of last block in the blob. + */ + private volatile long blobLength = 0; /** - * Variable to hold the next block id to be used for azure - * storage blocks. + * Minutes waiting before the close operation timed out. */ - private long nextBlockCount = UNSET_BLOCKS_COUNT; + private static final int CLOSE_UPLOAD_DELAY = 10; /** - * Variable to hold the block id prefix to be used for azure - * storage blocks from azure-storage-java sdk version 4.2.0 onwards + * Keep alive time for the threadpool. */ - private String blockIdPrefix = null; + private static final int THREADPOOL_KEEP_ALIVE = 30; + /** + * Azure Block Blob used for the stream. + */ + private final CloudBlockBlobWrapper blob; + + /** + * Azure Storage operation context. + */ + private final OperationContext opContext; + + /** + * Commands send from client calls to the background thread pool. + */ + private abstract class UploadCommand { + + // the blob offset for the command + private final long commandBlobOffset; + + // command completion latch + private final CountDownLatch completed = new CountDownLatch(1); + + UploadCommand(long offset) { + this.commandBlobOffset = offset; + } + + long getCommandBlobOffset() { + return commandBlobOffset; + } + + void await() throws InterruptedException { + completed.await(); + } + + void awaitAsDependent() throws InterruptedException { + await(); + } + + void setCompleted() { + completed.countDown(); + } - private final Random sequenceGenerator = new Random(); + void execute() throws InterruptedException, IOException {} + + void dump() {} + } + + /** + * The list of recent commands. Before block list is committed, all the block + * listed in the list must be uploaded. activeBlockCommands is used for + * enumerating the blocks and waiting on the latch until the block is + * uploaded. + */ + private final ConcurrentLinkedQueue<UploadCommand> activeBlockCommands + = new ConcurrentLinkedQueue<>(); + + /** + * Variable to track if the stream has been closed. + */ + private volatile boolean closed = false; + + /** + * First IOException encountered. + */ + private final AtomicReference<IOException> firstError + = new AtomicReference<>(); + + /** + * Flag set when the first error has been thrown. + */ + private boolean firstErrorThrown = false; /** - * Time to wait to renew lease in milliseconds + * Semaphore for serializing block uploads with NativeAzureFileSystem. + * + * The semaphore starts with number of permits equal to the number of block + * upload threads. Each block upload thread needs one permit to start the + * upload. The put block list acquires all the permits before the block list + * is committed. */ - private static final int LEASE_RENEWAL_PERIOD = 10000; + private final Semaphore uploadingSemaphore = new Semaphore( + MAX_NUMBER_THREADS_IN_THREAD_POOL, + true); /** - * Number of times to retry for lease renewal + * Queue storing buffers with the size of the Azure block ready for + * reuse. The pool allows reusing the blocks instead of allocating new + * blocks. After the data is sent to the service, the buffer is returned + * back to the queue */ - private static final int MAX_LEASE_RENEWAL_RETRY_COUNT = 3; + private final ElasticByteBufferPool poolReadyByteBuffers + = new ElasticByteBufferPool(); /** - * Time to wait before retrying to set the lease + * The blob's block list. */ - private static final int LEASE_RENEWAL_RETRY_SLEEP_PERIOD = 500; + private final List<BlockEntry> blockEntries = new ArrayList<>( + DEFAULT_CAPACITY_BLOCK_ENTRIES); + + private static final int DEFAULT_CAPACITY_BLOCK_ENTRIES = 1024; /** - * Metadata key used on the blob to indicate append lease is active + * The uncommitted blob's block list. */ - public static final String APPEND_LEASE = "append_lease"; + private final ConcurrentLinkedDeque<BlockEntry> uncommittedBlockEntries + = new ConcurrentLinkedDeque<>(); + + /** + * Variable to hold the next block id to be used for azure storage blocks. + */ + private static final int UNSET_BLOCKS_COUNT = -1; + private long nextBlockCount = UNSET_BLOCKS_COUNT; /** - * Timeout value for the append lease in millisecs. If the lease is not - * renewed within 30 seconds then another thread can acquire the append lease - * on the blob + * Variable to hold the block id prefix to be used for azure storage blocks. */ - public static final int APPEND_LEASE_TIMEOUT = 30000; + private String blockIdPrefix = null; /** - * Metdata key used on the blob to indicate last modified time of append lease + * Maximum number of threads in block upload thread pool. */ - public static final String APPEND_LEASE_LAST_MODIFIED = "append_lease_last_modified"; + private static final int MAX_NUMBER_THREADS_IN_THREAD_POOL = 4; /** * Number of times block upload needs is retried. @@ -145,17 +279,33 @@ public class BlockBlobAppendStream extends OutputStream { private static final int MAX_BLOCK_UPLOAD_RETRIES = 3; /** - * Wait time between block upload retries in millisecs. + * Wait time between block upload retries in milliseconds. */ private static final int BLOCK_UPLOAD_RETRY_INTERVAL = 1000; - private static final Logger LOG = LoggerFactory.getLogger(BlockBlobAppendStream.class); + /** + * Logger. + */ + private static final Logger LOG = + LoggerFactory.getLogger(BlockBlobAppendStream.class); + /** + * The absolute maximum of blocks for a blob. It includes committed and + * temporary blocks. + */ private static final int MAX_BLOCK_COUNT = 100000; + /** + * The upload thread pool executor. + */ private ThreadPoolExecutor ioThreadPool; /** + * Azure Storage access conditions for the blob. + */ + private final AccessCondition accessCondition = new AccessCondition(); + + /** * Atomic integer to provide thread id for thread names for uploader threads. */ private final AtomicInteger threadSequenceNumber; @@ -163,106 +313,123 @@ public class BlockBlobAppendStream extends OutputStream { /** * Prefix to be used for thread names for uploader threads. */ - private static final String THREAD_ID_PREFIX = "BlockBlobAppendStream"; - - private static final String UTC_STR = "UTC"; + private static final String THREAD_ID_PREFIX = "append-blockblob"; + /** + * BlockBlobAppendStream constructor. + * + * @param blob + * Azure Block Blob + * @param aKey + * blob's name + * @param bufferSize + * the maximum size of a blob block. + * @param compactionEnabled + * is the compaction process enabled for this blob + * @param opContext + * Azure Store operation context for the blob + * @throws IOException + * if an I/O error occurs. In particular, an IOException may be + * thrown if the output stream cannot be used for append operations + */ public BlockBlobAppendStream(final CloudBlockBlobWrapper blob, - final String aKey, final int bufferSize, final OperationContext opContext) + final String aKey, + final int bufferSize, + final boolean compactionEnabled, + final OperationContext opContext) throws IOException { - if (null == aKey || 0 == aKey.length()) { - throw new IllegalArgumentException( - "Illegal argument: The key string is null or empty"); - } - - if (0 >= bufferSize) { - throw new IllegalArgumentException( - "Illegal argument bufferSize cannot be zero or negative"); - } - + Preconditions.checkArgument(StringUtils.isNotEmpty(aKey)); + Preconditions.checkArgument(bufferSize >= 0); this.blob = blob; this.opContext = opContext; this.key = aKey; - this.bufferSize = bufferSize; + this.maxBlockSize = new AtomicInteger(bufferSize); this.threadSequenceNumber = new AtomicInteger(0); this.blockIdPrefix = null; - setBlocksCountAndBlockIdPrefix(); + this.compactionEnabled = compactionEnabled; + this.blobExist = true; + this.outBuffer = poolReadyByteBuffers.getBuffer(false, maxBlockSize.get()); - this.outBuffer = new ByteArrayOutputStream(bufferSize); - this.uncommittedBlockEntries = new ArrayList<BlockEntry>(); - - // Acquire append lease on the blob. try { - //Set the append lease if the value of the append lease is false - if (!updateBlobAppendMetadata(true, false)) { - LOG.error("Unable to set Append Lease on the Blob : {} " - + "Possibly because another client already has a create or append stream open on the Blob", key); - throw new IOException("Unable to set Append lease on the Blob. " - + "Possibly because another client already had an append stream open on the Blob."); - } - } catch (StorageException ex) { - LOG.error("Encountered Storage exception while acquiring append " - + "lease on blob : {}. Storage Exception : {} ErrorCode : {}", - key, ex, ex.getErrorCode()); + // download the block list + blockEntries.addAll( + blob.downloadBlockList( + BlockListingFilter.COMMITTED, + new BlobRequestOptions(), + opContext)); + + blobLength = blob.getProperties().getLength(); + + committedBlobLength.set(blobLength); - throw new IOException(ex); + // Acquiring lease on the blob. + lease = new SelfRenewingLease(blob, true); + accessCondition.setLeaseID(lease.getLeaseID()); + + } catch (StorageException ex) { + if (ex.getErrorCode().equals(StorageErrorCodeStrings.BLOB_NOT_FOUND)) { + blobExist = false; + } + else if (ex.getErrorCode().equals( + StorageErrorCodeStrings.LEASE_ALREADY_PRESENT)) { + throw new AzureException( + "Unable to set Append lease on the Blob: " + ex, ex); + } + else { + LOG.debug( + "Encountered storage exception." + + " StorageException : {} ErrorCode : {}", + ex, + ex.getErrorCode()); + throw new AzureException(ex); + } } - leaseFreed = false; + setBlocksCountAndBlockIdPrefix(blockEntries); + + this.ioThreadPool = new ThreadPoolExecutor( + MAX_NUMBER_THREADS_IN_THREAD_POOL, + MAX_NUMBER_THREADS_IN_THREAD_POOL, + THREADPOOL_KEEP_ALIVE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new UploaderThreadFactory()); } /** - * Helper method that starts an Append Lease renewer thread and the - * thread pool. + * Set payload size of the stream. + * It is intended to be used for unit testing purposes only. */ - public synchronized void initialize() { - - if (initialized) { - return; - } - /* - * Start the thread for Append lease renewer. - */ - Thread appendLeaseRenewer = new Thread(new AppendRenewer()); - appendLeaseRenewer.setDaemon(true); - appendLeaseRenewer.setName(String.format("%s-AppendLeaseRenewer", key)); - appendLeaseRenewer.start(); - - /* - * Parameters to ThreadPoolExecutor: - * corePoolSize : the number of threads to keep in the pool, even if they are idle, - * unless allowCoreThreadTimeOut is set - * maximumPoolSize : the maximum number of threads to allow in the pool - * keepAliveTime - when the number of threads is greater than the core, - * this is the maximum time that excess idle threads will - * wait for new tasks before terminating. - * unit - the time unit for the keepAliveTime argument - * workQueue - the queue to use for holding tasks before they are executed - * This queue will hold only the Runnable tasks submitted by the execute method. - */ - this.ioThreadPool = new ThreadPoolExecutor(4, 4, 2, TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), new UploaderThreadFactory()); + @VisibleForTesting + synchronized void setMaxBlockSize(int size) { + maxBlockSize.set(size); - initialized = true; + // it is for testing only so we can abandon the previously allocated + // payload + this.outBuffer = ByteBuffer.allocate(maxBlockSize.get()); } /** - * Get the blob name. - * - * @return String Blob name. + * Set compaction parameters. + * It is intended to be used for unit testing purposes only. */ - public String getKey() { - return key; + @VisibleForTesting + void setCompactionBlockCount(int activationCount) { + activateCompactionBlockCount = activationCount; } /** - * Get the backing blob. - * @return buffer size of the stream. + * Get the list of block entries. It is used for testing purposes only. + * @return List of block entries. */ - public int getBufferSize() { - return bufferSize; + @VisibleForTesting + List<BlockEntry> getBlockList() throws StorageException, IOException { + return blob.downloadBlockList( + BlockListingFilter.COMMITTED, + new BlobRequestOptions(), + opContext); } /** @@ -283,21 +450,6 @@ public class BlockBlobAppendStream extends OutputStream { } /** - * Writes b.length bytes from the specified byte array to this output stream. - * - * @param data - * the byte array to write. - * - * @throws IOException - * if an I/O error occurs. In particular, an IOException may be - * thrown if the output stream has been closed. - */ - @Override - public void write(final byte[] data) throws IOException { - write(data, 0, data.length); - } - - /** * Writes length bytes from the specified byte array starting at offset to * this output stream. * @@ -312,529 +464,678 @@ public class BlockBlobAppendStream extends OutputStream { * thrown if the output stream has been closed. */ @Override - public void write(final byte[] data, final int offset, final int length) + public synchronized void write(final byte[] data, int offset, int length) throws IOException { + Preconditions.checkArgument(data != null, "null data"); if (offset < 0 || length < 0 || length > data.length - offset) { - throw new IndexOutOfBoundsException("write API in append stream called with invalid arguments"); - } - - writeInternal(data, offset, length); - } - - @Override - public synchronized void close() throws IOException { - - if (!initialized) { - throw new IOException("Trying to close an uninitialized Append stream"); + throw new IndexOutOfBoundsException(); } if (closed) { - return; + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } - if (leaseFreed) { - throw new IOException(String.format("Attempting to close an append stream on blob : %s " - + " that does not have lease on the Blob. Failing close", key)); - } + while (outBuffer.remaining() < length) { + + int remaining = outBuffer.remaining(); + outBuffer.put(data, offset, remaining); + + // upload payload to azure storage + addBlockUploadCommand(); - if (outBuffer.size() > 0) { - uploadBlockToStorage(outBuffer.toByteArray()); + offset += remaining; + length -= remaining; } - ioThreadPool.shutdown(); + outBuffer.put(data, offset, length); + } - try { - if (!ioThreadPool.awaitTermination(10, TimeUnit.MINUTES)) { - LOG.error("Time out occurred while waiting for IO request to finish in append" - + " for blob : {}", key); - NativeAzureFileSystemHelper.logAllLiveStackTraces(); - throw new IOException("Timed out waiting for IO requests to finish"); - } - } catch(InterruptedException intrEx) { - // Restore the interrupted status - Thread.currentThread().interrupt(); - LOG.error("Upload block operation in append interrupted for blob {}. Failing close", key); - throw new IOException("Append Commit interrupted."); - } + /** + * Flushes this output stream and forces any buffered output bytes to be + * written out. If any data remains in the payload it is committed to the + * service. Data is queued for writing and forced out to the service + * before the call returns. + */ + @Override + public void flush() throws IOException { - // Calling commit after all blocks are succesfully uploaded. - if (lastError == null) { - commitAppendBlocks(); + if (closed) { + // calling close() after the stream is closed starts with call to flush() + return; } - // Perform cleanup. - cleanup(); + addBlockUploadCommand(); - if (lastError != null) { - throw lastError; + if (committedBlobLength.get() < blobLength) { + try { + // wait until the block list is committed + addFlushCommand().await(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } } } /** - * Helper method that cleans up the append stream. + * Force all data in the output stream to be written to Azure storage. + * Wait to return until this is complete. */ - private synchronized void cleanup() { - - closed = true; - - try { - // Set the value of append lease to false if the value is set to true. - updateBlobAppendMetadata(false, true); - } catch(StorageException ex) { - LOG.debug("Append metadata update on the Blob : {} encountered Storage Exception : {} " - + "Error Code : {}", - key, ex, ex.getErrorCode()); - lastError = new IOException(ex); + @Override + public void hsync() throws IOException { + // when block compaction is disabled, hsync is empty function + if (compactionEnabled) { + flush(); } + } - leaseFreed = true; + /** + * Force all data in the output stream to be written to Azure storage. + * Wait to return until this is complete. + */ + @Override + public void hflush() throws IOException { + // when block compaction is disabled, hflush is empty function + if (compactionEnabled) { + flush(); + } } /** - * Method to commit all the uncommited blocks to azure storage. - * If the commit fails then blocks are automatically cleaned up - * by Azure storage. - * @throws IOException + * The Synchronization capabilities of this stream depend upon the compaction + * policy. + * @param capability string to query the stream support for. + * @return true for hsync and hflush when compaction is enabled. */ - private synchronized void commitAppendBlocks() throws IOException { + @Override + public boolean hasCapability(String capability) { + return compactionEnabled + && (capability.equalsIgnoreCase(HSYNC.getValue()) + || capability.equalsIgnoreCase((HFLUSH.getValue()))); + } - SelfRenewingLease lease = null; + /** + * Force all data in the output stream to be written to Azure storage. + * Wait to return until this is complete. Close the access to the stream and + * shutdown the upload thread pool. + * If the blob was created, its lease will be released. + * Any error encountered caught in threads and stored will be rethrown here + * after cleanup. + */ + @Override + public synchronized void close() throws IOException { - try { - if (uncommittedBlockEntries.size() > 0) { + LOG.debug("close {} ", key); - //Acquiring lease on the blob. - lease = new SelfRenewingLease(blob); + if (closed) { + return; + } - // Downloading existing blocks - List<BlockEntry> blockEntries = blob.downloadBlockList(BlockListingFilter.COMMITTED, - new BlobRequestOptions(), opContext); + // Upload the last block regardless of compactionEnabled flag + flush(); - // Adding uncommitted blocks. - blockEntries.addAll(uncommittedBlockEntries); + // Initiates an orderly shutdown in which previously submitted tasks are + // executed. + ioThreadPool.shutdown(); - AccessCondition accessCondition = new AccessCondition(); - accessCondition.setLeaseID(lease.getLeaseID()); - blob.commitBlockList(blockEntries, accessCondition, new BlobRequestOptions(), opContext); - uncommittedBlockEntries.clear(); + try { + // wait up to CLOSE_UPLOAD_DELAY minutes to upload all the blocks + if (!ioThreadPool.awaitTermination(CLOSE_UPLOAD_DELAY, TimeUnit.MINUTES)) { + LOG.error("Time out occurred while close() is waiting for IO request to" + + " finish in append" + + " for blob : {}", + key); + NativeAzureFileSystemHelper.logAllLiveStackTraces(); + throw new AzureException("Timed out waiting for IO requests to finish"); } - } catch(StorageException ex) { - LOG.error("Storage exception encountered during block commit phase of append for blob" - + " : {} Storage Exception : {} Error Code: {}", key, ex, ex.getErrorCode()); - throw new IOException("Encountered Exception while committing append blocks", ex); - } finally { - if (lease != null) { + } catch(InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + // release the lease + if (firstError.get() == null && blobExist) { try { lease.free(); - } catch(StorageException ex) { - LOG.debug("Exception encountered while releasing lease for " - + "blob : {} StorageException : {} ErrorCode : {}", key, ex, ex.getErrorCode()); - // Swallowing exception here as the lease is cleaned up by the SelfRenewingLease object. + } catch (StorageException ex) { + LOG.debug("Lease free update blob {} encountered Storage Exception:" + + " {} Error Code : {}", + key, + ex, + ex.getErrorCode()); + maybeSetFirstError(new AzureException(ex)); } - } + } + + closed = true; + + // finally, throw the first exception raised if it has not + // been thrown elsewhere. + if (firstError.get() != null && !firstErrorThrown) { + throw firstError.get(); } } /** - * Helper method used to generate the blockIDs. The algorithm used is similar to the Azure - * storage SDK. + * Helper method used to generate the blockIDs. The algorithm used is similar + * to the Azure storage SDK. */ - private void setBlocksCountAndBlockIdPrefix() throws IOException { + private void setBlocksCountAndBlockIdPrefix(List<BlockEntry> blockEntries) { - try { + if (nextBlockCount == UNSET_BLOCKS_COUNT && blockIdPrefix == null) { - if (nextBlockCount == UNSET_BLOCKS_COUNT && blockIdPrefix==null) { + Random sequenceGenerator = new Random(); - List<BlockEntry> blockEntries = - blob.downloadBlockList(BlockListingFilter.COMMITTED, new BlobRequestOptions(), opContext); + String blockZeroBlockId = (!blockEntries.isEmpty()) + ? blockEntries.get(0).getId() + : ""; + String prefix = UUID.randomUUID().toString() + "-"; + String sampleNewerVersionBlockId = generateNewerVersionBlockId(prefix, + 0); - String blockZeroBlockId = (blockEntries.size() > 0) ? blockEntries.get(0).getId() : ""; - String prefix = UUID.randomUUID().toString() + "-"; - String sampleNewerVersionBlockId = generateNewerVersionBlockId(prefix, 0); + if (!blockEntries.isEmpty() + && blockZeroBlockId.length() < sampleNewerVersionBlockId.length()) { - if (blockEntries.size() > 0 && blockZeroBlockId.length() < sampleNewerVersionBlockId.length()) { + // If blob has already been created with 2.2.0, append subsequent blocks + // with older version (2.2.0) blockId compute nextBlockCount, the way it + // was done before; and don't use blockIdPrefix + this.blockIdPrefix = ""; + nextBlockCount = (long) (sequenceGenerator.nextInt(Integer.MAX_VALUE)) + + sequenceGenerator.nextInt( + Integer.MAX_VALUE - MAX_BLOCK_COUNT); + nextBlockCount += blockEntries.size(); - // If blob has already been created with 2.2.0, append subsequent blocks with older version (2.2.0) blockId - // compute nextBlockCount, the way it was done before; and don't use blockIdPrefix - this.blockIdPrefix = ""; - nextBlockCount = (long) (sequenceGenerator.nextInt(Integer.MAX_VALUE)) - + sequenceGenerator.nextInt(Integer.MAX_VALUE - MAX_BLOCK_COUNT); - nextBlockCount += blockEntries.size(); - - } else { - - // If there are no existing blocks, create the first block with newer version (4.2.0) blockId - // If blob has already been created with 4.2.0, append subsequent blocks with newer version (4.2.0) blockId - this.blockIdPrefix = prefix; - nextBlockCount = blockEntries.size(); - - } + } else { + // If there are no existing blocks, create the first block with newer + // version (4.2.0) blockId. If blob has already been created with 4.2.0, + // append subsequent blocks with newer version (4.2.0) blockId + this.blockIdPrefix = prefix; + nextBlockCount = blockEntries.size(); } - - } catch (StorageException ex) { - LOG.debug("Encountered storage exception during setting next Block Count and BlockId prefix." - + " StorageException : {} ErrorCode : {}", ex, ex.getErrorCode()); - throw new IOException(ex); } } /** - * Helper method that generates the next block id for uploading a block to azure storage. + * Helper method that generates the next block id for uploading a block to + * azure storage. * @return String representing the block ID generated. - * @throws IOException + * @throws IOException if the stream is in invalid state */ private String generateBlockId() throws IOException { - if (nextBlockCount == UNSET_BLOCKS_COUNT) { - throw new IOException("Append Stream in invalid state. nextBlockCount not set correctly"); - } - - if (this.blockIdPrefix == null) { - throw new IOException("Append Stream in invalid state. blockIdPrefix not set correctly"); - } - - if (!this.blockIdPrefix.equals("")) { - - return generateNewerVersionBlockId(this.blockIdPrefix, nextBlockCount++); - - } else { - - return generateOlderVersionBlockId(nextBlockCount++); - + if (nextBlockCount == UNSET_BLOCKS_COUNT || blockIdPrefix == null) { + throw new AzureException( + "Append Stream in invalid state. nextBlockCount not set correctly"); } + return (!blockIdPrefix.isEmpty()) + ? generateNewerVersionBlockId(blockIdPrefix, nextBlockCount++) + : generateOlderVersionBlockId(nextBlockCount++); } /** - * Helper method that generates an older (2.2.0) version blockId + * Helper method that generates an older (2.2.0) version blockId. * @return String representing the block ID generated. */ private String generateOlderVersionBlockId(long id) { - byte[] blockIdInBytes = getBytesFromLong(id); - return new String(Base64.encodeBase64(blockIdInBytes), StandardCharsets.UTF_8); + byte[] blockIdInBytes = new byte[8]; + for (int m = 0; m < 8; m++) { + blockIdInBytes[7 - m] = (byte) ((id >> (8 * m)) & 0xFF); + } + + return new String( + Base64.encodeBase64(blockIdInBytes), + StandardCharsets.UTF_8); } /** - * Helper method that generates an newer (4.2.0) version blockId + * Helper method that generates an newer (4.2.0) version blockId. * @return String representing the block ID generated. */ private String generateNewerVersionBlockId(String prefix, long id) { String blockIdSuffix = String.format("%06d", id); - byte[] blockIdInBytes = (prefix + blockIdSuffix).getBytes(StandardCharsets.UTF_8); + byte[] blockIdInBytes = + (prefix + blockIdSuffix).getBytes(StandardCharsets.UTF_8); return new String(Base64.encodeBase64(blockIdInBytes), StandardCharsets.UTF_8); } /** - * Returns a byte array that represents the data of a <code>long</code> value. This - * utility method is copied from com.microsoft.azure.storage.core.Utility class. - * This class is marked as internal, hence we clone the method here and not express - * dependency on the Utility Class - * - * @param value - * The value from which the byte array will be returned. - * - * @return A byte array that represents the data of the specified <code>long</code> value. + * This is shared between upload block Runnable and CommitBlockList. The + * method captures retry logic + * @param blockId block name + * @param dataPayload block content */ - private static byte[] getBytesFromLong(final long value) { + private void writeBlockRequestInternal(String blockId, + ByteBuffer dataPayload, + boolean bufferPoolBuffer) { + IOException lastLocalException = null; + + int uploadRetryAttempts = 0; + while (uploadRetryAttempts < MAX_BLOCK_UPLOAD_RETRIES) { + try { + long startTime = System.nanoTime(); + + blob.uploadBlock(blockId, accessCondition, new ByteArrayInputStream( + dataPayload.array()), dataPayload.position(), + new BlobRequestOptions(), opContext); - final byte[] tempArray = new byte[8]; + LOG.debug("upload block finished for {} ms. block {} ", + TimeUnit.NANOSECONDS.toMillis( + System.nanoTime() - startTime), blockId); + break; + + } catch(Exception ioe) { + LOG.debug("Encountered exception during uploading block for Blob {}" + + " Exception : {}", key, ioe); + uploadRetryAttempts++; + lastLocalException = new AzureException( + "Encountered Exception while uploading block: " + ioe, ioe); + try { + Thread.sleep( + BLOCK_UPLOAD_RETRY_INTERVAL * (uploadRetryAttempts + 1)); + } catch(InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + } - for (int m = 0; m < 8; m++) { - tempArray[7 - m] = (byte) ((value >> (8 * m)) & 0xFF); + if (bufferPoolBuffer) { + poolReadyByteBuffers.putBuffer(dataPayload); } - return tempArray; + if (uploadRetryAttempts == MAX_BLOCK_UPLOAD_RETRIES) { + maybeSetFirstError(lastLocalException); + } } /** - * Helper method that creates a thread to upload a block to azure storage. - * @param payload - * @throws IOException + * Set {@link #firstError} to the exception if it is not already set. + * @param exception exception to save */ - private synchronized void uploadBlockToStorage(byte[] payload) - throws IOException { - - // upload payload to azure storage - String blockId = generateBlockId(); - - // Since uploads of the Azure storage are done in parallel threads, we go ahead - // add the blockId in the uncommitted list. If the upload of the block fails - // we don't commit the blockIds. - BlockEntry blockEntry = new BlockEntry(blockId); - blockEntry.setSize(payload.length); - uncommittedBlockEntries.add(blockEntry); - ioThreadPool.execute(new WriteRequest(payload, blockId)); + private void maybeSetFirstError(IOException exception) { + firstError.compareAndSet(null, exception); } /** - * Helper method to updated the Blob metadata during Append lease operations. - * Blob metadata is updated to holdLease value only if the current lease - * status is equal to testCondition and the last update on the blob metadata - * is less that 30 secs old. - * @param holdLease - * @param testCondition - * @return true if the updated lease operation was successful or false otherwise - * @throws StorageException + * Throw the first error caught if it has not been raised already + * @throws IOException if one is caught and needs to be thrown. */ - private boolean updateBlobAppendMetadata(boolean holdLease, boolean testCondition) - throws StorageException { - - SelfRenewingLease lease = null; - StorageException lastStorageException = null; - int leaseRenewalRetryCount = 0; - - /* - * Updating the Blob metadata honours following algorithm based on - * 1) If the append lease metadata is present - * 2) Last updated time of the append lease - * 3) Previous value of the Append lease metadata. - * - * The algorithm: - * 1) If append lease metadata is not part of the Blob. In this case - * this is the first client to Append so we update the metadata. - * 2) If append lease metadata is present and timeout has occurred. - * In this case irrespective of what the value of the append lease is we update the metadata. - * 3) If append lease metadata is present and is equal to testCondition value (passed as parameter) - * and timeout has not occurred, we update the metadata. - * 4) If append lease metadata is present and is not equal to testCondition value (passed as parameter) - * and timeout has not occurred, we do not update metadata and return false. - * - */ - while (leaseRenewalRetryCount < MAX_LEASE_RENEWAL_RETRY_COUNT) { - - lastStorageException = null; - - synchronized(this) { - try { - - final Calendar currentCalendar = Calendar - .getInstance(Locale.US); - currentCalendar.setTimeZone(TimeZone.getTimeZone(UTC_STR)); - long currentTime = currentCalendar.getTime().getTime(); + private void maybeThrowFirstError() throws IOException { + if (firstError.get() != null) { + firstErrorThrown = true; + throw firstError.get(); + } + } - // Acquire lease on the blob. - lease = new SelfRenewingLease(blob); + /** + * Write block list. The method captures retry logic + */ + private void writeBlockListRequestInternal() { - blob.downloadAttributes(opContext); - HashMap<String, String> metadata = blob.getMetadata(); + IOException lastLocalException = null; - if (metadata.containsKey(APPEND_LEASE) - && currentTime - Long.parseLong( - metadata.get(APPEND_LEASE_LAST_MODIFIED)) <= BlockBlobAppendStream.APPEND_LEASE_TIMEOUT - && !metadata.get(APPEND_LEASE).equals(Boolean.toString(testCondition))) { - return false; - } + int uploadRetryAttempts = 0; + while (uploadRetryAttempts < MAX_BLOCK_UPLOAD_RETRIES) { + try { - metadata.put(APPEND_LEASE, Boolean.toString(holdLease)); - metadata.put(APPEND_LEASE_LAST_MODIFIED, Long.toString(currentTime)); - blob.setMetadata(metadata); - AccessCondition accessCondition = new AccessCondition(); - accessCondition.setLeaseID(lease.getLeaseID()); - blob.uploadMetadata(accessCondition, null, opContext); - return true; + long startTime = System.nanoTime(); - } catch (StorageException ex) { - - lastStorageException = ex; - LOG.debug("Lease renewal for Blob : {} encountered Storage Exception : {} " - + "Error Code : {}", - key, ex, ex.getErrorCode()); - leaseRenewalRetryCount++; - - } finally { - - if (lease != null) { - try { - lease.free(); - } catch(StorageException ex) { - LOG.debug("Encountered Storage exception while releasing lease for Blob {} " - + "during Append metadata operation. Storage Exception {} " - + "Error Code : {} ", key, ex, ex.getErrorCode()); - } finally { - lease = null; - } - } - } - } + blob.commitBlockList(blockEntries, accessCondition, + new BlobRequestOptions(), opContext); - if (leaseRenewalRetryCount == MAX_LEASE_RENEWAL_RETRY_COUNT) { - throw lastStorageException; - } else { + LOG.debug("Upload block list took {} ms for blob {} ", + TimeUnit.NANOSECONDS.toMillis( + System.nanoTime() - startTime), key); + break; + + } catch(Exception ioe) { + LOG.debug("Encountered exception during uploading block for Blob {}" + + " Exception : {}", key, ioe); + uploadRetryAttempts++; + lastLocalException = new AzureException( + "Encountered Exception while uploading block: " + ioe, ioe); try { - Thread.sleep(LEASE_RENEWAL_RETRY_SLEEP_PERIOD); - } catch(InterruptedException ex) { - LOG.debug("Blob append metadata updated method interrupted"); + Thread.sleep( + BLOCK_UPLOAD_RETRY_INTERVAL * (uploadRetryAttempts + 1)); + } catch(InterruptedException ie) { Thread.currentThread().interrupt(); + break; } } } - // The code should not enter here because the while loop will - // always be executed and if the while loop is executed we - // would returning from the while loop. - return false; + if (uploadRetryAttempts == MAX_BLOCK_UPLOAD_RETRIES) { + maybeSetFirstError(lastLocalException); + } } /** - * This is the only method that should be writing to outBuffer to maintain consistency of the outBuffer. - * @param data - * @param offset - * @param length - * @throws IOException + * A ThreadFactory that creates uploader thread with + * meaningful names helpful for debugging purposes. */ - private synchronized void writeInternal(final byte[] data, final int offset, final int length) - throws IOException { + class UploaderThreadFactory implements ThreadFactory { - if (!initialized) { - throw new IOException("Trying to write to an un-initialized Append stream"); + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName(String.format("%s-%d", THREAD_ID_PREFIX, + threadSequenceNumber.getAndIncrement())); + return t; } + } - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } + /** + * Upload block commands. + */ + private class UploadBlockCommand extends UploadCommand { - if (leaseFreed) { - throw new IOException(String.format("Write called on a append stream not holding lease. Failing Write")); - } + // the block content for upload + private final ByteBuffer payload; - byte[] currentData = new byte[length]; - System.arraycopy(data, offset, currentData, 0, length); + // description of the block + private final BlockEntry entry; - // check to see if the data to be appended exceeds the - // buffer size. If so we upload a block to azure storage. - while ((outBuffer.size() + currentData.length) > bufferSize) { + UploadBlockCommand(String blockId, ByteBuffer payload) { - byte[] payload = new byte[bufferSize]; + super(blobLength); - // Add data from the existing buffer - System.arraycopy(outBuffer.toByteArray(), 0, payload, 0, outBuffer.size()); + BlockEntry blockEntry = new BlockEntry(blockId); + blockEntry.setSize(payload.position()); + blockEntry.setSearchMode(BlockSearchMode.LATEST); - // Updating the available size in the payload - int availableSpaceInPayload = bufferSize - outBuffer.size(); + this.payload = payload; + this.entry = blockEntry; - // Adding data from the current call - System.arraycopy(currentData, 0, payload, outBuffer.size(), availableSpaceInPayload); + uncommittedBlockEntries.add(blockEntry); + } - uploadBlockToStorage(payload); + /** + * Execute command. + */ + void execute() throws InterruptedException { + + uploadingSemaphore.acquire(1); + writeBlockRequestInternal(entry.getId(), payload, true); + uploadingSemaphore.release(1); - // updating the currentData buffer - byte[] tempBuffer = new byte[currentData.length - availableSpaceInPayload]; - System.arraycopy(currentData, availableSpaceInPayload, - tempBuffer, 0, currentData.length - availableSpaceInPayload); - currentData = tempBuffer; - outBuffer = new ByteArrayOutputStream(bufferSize); } - outBuffer.write(currentData); + void dump() { + LOG.debug("upload block {} size: {} for blob {}", + entry.getId(), + entry.getSize(), + key); + } } /** - * Runnable instance that uploads the block of data to azure storage. - * - * + * Upload blob block list commands. */ - private class WriteRequest implements Runnable { - private final byte[] dataPayload; - private final String blockId; + private class UploadBlockListCommand extends UploadCommand { + + private BlockEntry lastBlock = null; + + UploadBlockListCommand() { + super(blobLength); - public WriteRequest(byte[] dataPayload, String blockId) { - this.dataPayload = dataPayload; - this.blockId = blockId; + if (!uncommittedBlockEntries.isEmpty()) { + lastBlock = uncommittedBlockEntries.getLast(); + } } - @Override - public void run() { + void awaitAsDependent() throws InterruptedException { + // empty. later commit block does not need to wait previous commit block + // lists. + } - int uploadRetryAttempts = 0; - IOException lastLocalException = null; - while (uploadRetryAttempts < MAX_BLOCK_UPLOAD_RETRIES) { - try { + void dump() { + LOG.debug("commit block list with {} blocks for blob {}", + uncommittedBlockEntries.size(), key); + } + + /** + * Execute command. + */ + public void execute() throws InterruptedException, IOException { + + if (committedBlobLength.get() >= getCommandBlobOffset()) { + LOG.debug("commit already applied for {}", key); + return; + } + + if (lastBlock == null) { + LOG.debug("nothing to commit for {}", key); + return; + } + + LOG.debug("active commands: {} for {}", activeBlockCommands.size(), key); + + for (UploadCommand activeCommand : activeBlockCommands) { + if (activeCommand.getCommandBlobOffset() < getCommandBlobOffset()) { + activeCommand.dump(); + activeCommand.awaitAsDependent(); + } else { + break; + } + } + + // stop all uploads until the block list is committed + uploadingSemaphore.acquire(MAX_NUMBER_THREADS_IN_THREAD_POOL); + + BlockEntry uncommittedBlock; + do { + uncommittedBlock = uncommittedBlockEntries.poll(); + blockEntries.add(uncommittedBlock); + } while (uncommittedBlock != lastBlock); + + if (blockEntries.size() > activateCompactionBlockCount) { + LOG.debug("Block compaction: activated with {} blocks for {}", + blockEntries.size(), key); + + // Block compaction + long startCompaction = System.nanoTime(); + blockCompaction(); + LOG.debug("Block compaction finished for {} ms with {} blocks for {}", + TimeUnit.NANOSECONDS.toMillis( + System.nanoTime() - startCompaction), + blockEntries.size(), key); + } + + writeBlockListRequestInternal(); + + uploadingSemaphore.release(MAX_NUMBER_THREADS_IN_THREAD_POOL); - blob.uploadBlock(blockId, new ByteArrayInputStream(dataPayload), - dataPayload.length, new BlobRequestOptions(), opContext); + // remove blocks previous commands + for (Iterator<UploadCommand> it = activeBlockCommands.iterator(); + it.hasNext();) { + UploadCommand activeCommand = it.next(); + if (activeCommand.getCommandBlobOffset() <= getCommandBlobOffset()) { + it.remove(); + } else { break; - } catch(Exception ioe) { - Log.getLog().debug("Encountered exception during uploading block for Blob : {} Exception : {}", key, ioe); - uploadRetryAttempts++; - lastLocalException = new IOException("Encountered Exception while uploading block", ioe); - try { - Thread.sleep(BLOCK_UPLOAD_RETRY_INTERVAL); - } catch(InterruptedException ie) { - Thread.currentThread().interrupt(); - break; + } + } + + committedBlobLength.set(getCommandBlobOffset()); + } + + /** + * Internal output stream with read access to the internal buffer. + */ + private class ByteArrayOutputStreamInternal extends ByteArrayOutputStream { + + ByteArrayOutputStreamInternal(int size) { + super(size); + } + + byte[] getByteArray() { + return buf; + } + } + + /** + * Block compaction process. + * + * Block compaction is only enabled when the number of blocks exceeds + * activateCompactionBlockCount. The algorithm searches for the longest + * segment [b..e) where (e-b) > 2 && |b| + |b+1| ... |e-1| < maxBlockSize + * such that size(b1) + size(b2) + ... + size(bn) < maximum-block-size. + * It then downloads the blocks in the sequence, concatenates the data to + * form a single block, uploads this new block, and updates the block + * list to replace the sequence of blocks with the new block. + */ + private void blockCompaction() throws IOException { + //current segment [segmentBegin, segmentEnd) and file offset/size of the + // current segment + int segmentBegin = 0, segmentEnd = 0; + long segmentOffsetBegin = 0, segmentOffsetEnd = 0; + + //longest segment [maxSegmentBegin, maxSegmentEnd) and file offset/size of + // the longest segment + int maxSegmentBegin = 0, maxSegmentEnd = 0; + long maxSegmentOffsetBegin = 0, maxSegmentOffsetEnd = 0; + + for (BlockEntry block : blockEntries) { + segmentEnd++; + segmentOffsetEnd += block.getSize(); + if (segmentOffsetEnd - segmentOffsetBegin > maxBlockSize.get()) { + if (segmentEnd - segmentBegin > 2) { + if (maxSegmentEnd - maxSegmentBegin < segmentEnd - segmentBegin) { + maxSegmentBegin = segmentBegin; + maxSegmentEnd = segmentEnd; + maxSegmentOffsetBegin = segmentOffsetBegin; + maxSegmentOffsetEnd = segmentOffsetEnd - block.getSize(); + } } + segmentBegin = segmentEnd - 1; + segmentOffsetBegin = segmentOffsetEnd - block.getSize(); } } - if (uploadRetryAttempts == MAX_BLOCK_UPLOAD_RETRIES) { - lastError = lastLocalException; + if (maxSegmentEnd - maxSegmentBegin > 1) { + + LOG.debug("Block compaction: {} blocks for {}", + maxSegmentEnd - maxSegmentBegin, key); + + // download synchronously all the blocks from the azure storage + ByteArrayOutputStreamInternal blockOutputStream + = new ByteArrayOutputStreamInternal(maxBlockSize.get()); + + try { + long length = maxSegmentOffsetEnd - maxSegmentOffsetBegin; + blob.downloadRange(maxSegmentOffsetBegin, length, blockOutputStream, + new BlobRequestOptions(), opContext); + } catch(StorageException ex) { + LOG.error( + "Storage exception encountered during block compaction phase" + + " : {} Storage Exception : {} Error Code: {}", + key, ex, ex.getErrorCode()); + throw new AzureException( + "Encountered Exception while committing append blocks " + ex, ex); + } + + // upload synchronously new block to the azure storage + String blockId = generateBlockId(); + + ByteBuffer byteBuffer = ByteBuffer.wrap( + blockOutputStream.getByteArray()); + byteBuffer.position(blockOutputStream.size()); + + writeBlockRequestInternal(blockId, byteBuffer, false); + + // replace blocks from the longest segment with new block id + blockEntries.subList(maxSegmentBegin + 1, maxSegmentEnd - 1).clear(); + BlockEntry newBlock = blockEntries.get(maxSegmentBegin); + newBlock.setId(blockId); + newBlock.setSearchMode(BlockSearchMode.LATEST); + newBlock.setSize(maxSegmentOffsetEnd - maxSegmentOffsetBegin); } } } /** - * A ThreadFactory that creates uploader thread with - * meaningful names helpful for debugging purposes. + * Prepare block upload command and queue the command in thread pool executor. */ - class UploaderThreadFactory implements ThreadFactory { + private synchronized void addBlockUploadCommand() throws IOException { + + maybeThrowFirstError(); + + if (blobExist && lease.isFreed()) { + throw new AzureException(String.format( + "Attempting to upload a block on blob : %s " + + " that does not have lease on the Blob. Failing upload", key)); + } + + int blockSize = outBuffer.position(); + if (blockSize > 0) { + UploadCommand command = new UploadBlockCommand(generateBlockId(), + outBuffer); + activeBlockCommands.add(command); + + blobLength += blockSize; + outBuffer = poolReadyByteBuffers.getBuffer(false, maxBlockSize.get()); + + ioThreadPool.execute(new WriteRequest(command)); - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setName(String.format("%s-%s-%d", THREAD_ID_PREFIX, key, - threadSequenceNumber.getAndIncrement())); - return t; } } /** - * A deamon thread that renews the Append lease on the blob. - * The thread sleeps for LEASE_RENEWAL_PERIOD time before renewing - * the lease. If an error is encountered while renewing the lease - * then an lease is released by this thread, which fails all other - * operations. + * Prepare block list commit command and queue the command in thread pool + * executor. */ - private class AppendRenewer implements Runnable { + private synchronized UploadCommand addFlushCommand() throws IOException { - @Override - public void run() { + maybeThrowFirstError(); - while (!leaseFreed) { + if (blobExist && lease.isFreed()) { + throw new AzureException( + String.format("Attempting to upload block list on blob : %s" + + " that does not have lease on the Blob. Failing upload", key)); + } - try { - Thread.sleep(LEASE_RENEWAL_PERIOD); - } catch (InterruptedException ie) { - LOG.debug("Appender Renewer thread interrupted"); - Thread.currentThread().interrupt(); - } + UploadCommand command = new UploadBlockListCommand(); + activeBlockCommands.add(command); - Log.getLog().debug("Attempting to renew append lease on {}", key); + ioThreadPool.execute(new WriteRequest(command)); - try { - if (!leaseFreed) { - // Update the blob metadata to renew the append lease - if (!updateBlobAppendMetadata(true, true)) { - LOG.error("Unable to re-acquire append lease on the Blob {} ", key); - leaseFreed = true; - } - } - } catch (StorageException ex) { + return command; + } - LOG.debug("Lease renewal for Blob : {} encountered " - + "Storage Exception : {} Error Code : {}", key, ex, ex.getErrorCode()); + /** + * Runnable instance that uploads the block of data to azure storage. + */ + private class WriteRequest implements Runnable { + private final UploadCommand command; - // We swallow the exception here because if the blob metadata is not updated for - // APPEND_LEASE_TIMEOUT period, another thread would be able to detect this and - // continue forward if it needs to append. - leaseFreed = true; - } + WriteRequest(UploadCommand command) { + this.command = command; + } + + @Override + public void run() { + + try { + command.dump(); + long startTime = System.nanoTime(); + command.execute(); + command.setCompleted(); + LOG.debug("command finished for {} ms", + TimeUnit.NANOSECONDS.toMillis( + System.nanoTime() - startTime)); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } catch (Exception ex) { + LOG.debug( + "Encountered exception during execution of command for Blob :" + + " {} Exception : {}", key, ex); + firstError.compareAndSet(null, new AzureException(ex)); } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java index 0bde124..280c0e0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java @@ -62,6 +62,8 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem; import org.apache.hadoop.fs.azure.security.Constants; @@ -352,9 +354,9 @@ public class NativeAzureFileSystem extends FileSystem { } /** - * This is an exact copy of org.codehaus.jettison.json.JSONObject.quote + * This is an exact copy of org.codehaus.jettison.json.JSONObject.quote * method. - * + * * Produce a string in double quotes with backslash sequences in all the * right places. A backslash will be inserted within </, allowing JSON * text to be delivered in HTML. In JSON text, a string cannot contain a @@ -947,11 +949,11 @@ public class NativeAzureFileSystem extends FileSystem { } } - private class NativeAzureFsOutputStream extends OutputStream { - // We should not override flush() to actually close current block and flush - // to DFS, this will break applications that assume flush() is a no-op. - // Applications are advised to use Syncable.hflush() for that purpose. - // NativeAzureFsOutputStream needs to implement Syncable if needed. + /** + * Azure output stream; wraps an inner stream of different types. + */ + public class NativeAzureFsOutputStream extends OutputStream + implements Syncable, StreamCapabilities { private String key; private String keyEncoded; private OutputStream out; @@ -983,6 +985,48 @@ public class NativeAzureFileSystem extends FileSystem { setEncodedKey(anEncodedKey); } + /** + * Get a reference to the wrapped output stream. + * + * @return the underlying output stream + */ + @InterfaceAudience.LimitedPrivate({"HDFS"}) + public OutputStream getOutStream() { + return out; + } + + @Override // Syncable + public void hflush() throws IOException { + if (out instanceof Syncable) { + ((Syncable) out).hflush(); + } else { + flush(); + } + } + + @Override // Syncable + public void hsync() throws IOException { + if (out instanceof Syncable) { + ((Syncable) out).hsync(); + } else { + flush(); + } + } + + /** + * Propagate probe of stream capabilities to nested stream + * (if supported), else return false. + * @param capability string to query the stream support for. + * @return true if the nested stream supports the specific capability. + */ + @Override // StreamCapability + public boolean hasCapability(String capability) { + if (out instanceof StreamCapabilities) { + return ((StreamCapabilities) out).hasCapability(capability); + } + return false; + } + @Override public synchronized void close() throws IOException { if (out != null) { @@ -990,8 +1034,11 @@ public class NativeAzureFileSystem extends FileSystem { // before returning to the caller. // out.close(); - restoreKey(); - out = null; + try { + restoreKey(); + } finally { + out = null; + } } } @@ -1045,10 +1092,10 @@ public class NativeAzureFileSystem extends FileSystem { /** * Writes <code>len</code> from the specified byte array starting at offset * <code>off</code> to the output stream. The general contract for write(b, - * off, len) is that some of the bytes in the array <code> - * b</code b> are written to the output stream in order; element - * <code>b[off]</code> is the first byte written and - * <code>b[off+len-1]</code> is the last byte written by this operation. + * off, len) is that some of the bytes in the array <code>b</code> + * are written to the output stream in order; element <code>b[off]</code> + * is the first byte written and <code>b[off+len-1]</code> is the last + * byte written by this operation. * * @param b * Byte array to be written. @@ -1749,7 +1796,7 @@ public class NativeAzureFileSystem extends FileSystem { OutputStream bufOutStream; if (store.isPageBlobKey(key)) { // Store page blobs directly in-place without renames. - bufOutStream = store.storefile(key, permissionStatus); + bufOutStream = store.storefile(key, permissionStatus, key); } else { // This is a block blob, so open the output blob stream based on the // encoded key. @@ -1777,7 +1824,7 @@ public class NativeAzureFileSystem extends FileSystem { // these // blocks. bufOutStream = new NativeAzureFsOutputStream(store.storefile( - keyEncoded, permissionStatus), key, keyEncoded); + keyEncoded, permissionStatus, key), key, keyEncoded); } // Construct the data output stream from the buffered output stream. FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics); http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java index 1c7309f..57a729d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java @@ -50,8 +50,9 @@ interface NativeFileSystemStore { InputStream retrieve(String key, long byteRangeStart) throws IOException; - DataOutputStream storefile(String key, PermissionStatus permissionStatus) - throws AzureException; + DataOutputStream storefile(String keyEncoded, + PermissionStatus permissionStatus, + String key) throws AzureException; boolean isPageBlobKey(String key); http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java index 5dbb6bc..7c2722e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java @@ -519,7 +519,7 @@ public class SecureStorageInterfaceImpl extends StorageInterface { @Override public SelfRenewingLease acquireLease() throws StorageException { - return new SelfRenewingLease(this); + return new SelfRenewingLease(this, false); } } @@ -557,10 +557,12 @@ public class SecureStorageInterfaceImpl extends StorageInterface { } @Override - public void uploadBlock(String blockId, InputStream sourceStream, + public void uploadBlock(String blockId, AccessCondition accessCondition, + InputStream sourceStream, long length, BlobRequestOptions options, OperationContext opContext) throws IOException, StorageException { - ((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, null, options, opContext); + ((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, + accessCondition, options, opContext); } @Override @@ -593,4 +595,4 @@ public class SecureStorageInterfaceImpl extends StorageInterface { null, options, opContext); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java index 00d5e99..10956f7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java @@ -30,6 +30,8 @@ import com.microsoft.azure.storage.blob.CloudBlob; import java.util.concurrent.atomic.AtomicInteger; +import static com.microsoft.azure.storage.StorageErrorCodeStrings.LEASE_ALREADY_PRESENT; + /** * An Azure blob lease that automatically renews itself indefinitely * using a background thread. Use it to synchronize distributed processes, @@ -66,7 +68,7 @@ public class SelfRenewingLease { @VisibleForTesting static final int LEASE_ACQUIRE_RETRY_INTERVAL = 2000; - public SelfRenewingLease(CloudBlobWrapper blobWrapper) + public SelfRenewingLease(CloudBlobWrapper blobWrapper, boolean throwIfPresent) throws StorageException { this.leaseFreed = false; @@ -79,10 +81,14 @@ public class SelfRenewingLease { leaseID = blob.acquireLease(LEASE_TIMEOUT, null); } catch (StorageException e) { + if (throwIfPresent && e.getErrorCode().equals(LEASE_ALREADY_PRESENT)) { + throw e; + } + // Throw again if we don't want to keep waiting. // We expect it to be that the lease is already present, // or in some cases that the blob does not exist. - if (!"LeaseAlreadyPresent".equals(e.getErrorCode())) { + if (!LEASE_ALREADY_PRESENT.equals(e.getErrorCode())) { LOG.info( "Caught exception when trying to get lease on blob " + blobWrapper.getUri().toString() + ". " + e.getMessage()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java index 8b6b082..e03d731 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java @@ -665,6 +665,7 @@ abstract class StorageInterface { * * @param blockId A String that represents the Base-64 encoded block ID. Note for a given blob * the length of all Block IDs must be identical. + * @param accessCondition An {@link AccessCondition} object that represents the access conditions for the blob. * @param sourceStream An {@link InputStream} object that represents the input stream to write to the * block blob. * @param length A long which represents the length, in bytes, of the stream data, @@ -678,7 +679,7 @@ abstract class StorageInterface { * @throws IOException If an I/O error occurred. * @throws StorageException If a storage service error occurred. */ - void uploadBlock(String blockId, InputStream sourceStream, + void uploadBlock(String blockId, AccessCondition accessCondition, InputStream sourceStream, long length, BlobRequestOptions options, OperationContext opContext) throws IOException, StorageException; http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java index d3d0370..41a4dbb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java @@ -277,7 +277,7 @@ class StorageInterfaceImpl extends StorageInterface { return new CloudBlockBlobWrapperImpl(container.getBlockBlobReference(relativePath)); } - + @Override public CloudBlobWrapper getPageBlobReference(String relativePath) throws URISyntaxException, StorageException { @@ -286,7 +286,7 @@ class StorageInterfaceImpl extends StorageInterface { } } - + abstract static class CloudBlobWrapperImpl implements CloudBlobWrapper { private final CloudBlob blob; @@ -441,10 +441,10 @@ class StorageInterfaceImpl extends StorageInterface { @Override public SelfRenewingLease acquireLease() throws StorageException { - return new SelfRenewingLease(this); + return new SelfRenewingLease(this, false); } } - + // // CloudBlockBlobWrapperImpl @@ -479,10 +479,10 @@ class StorageInterfaceImpl extends StorageInterface { } @Override - public void uploadBlock(String blockId, InputStream sourceStream, + public void uploadBlock(String blockId, AccessCondition accessCondition, InputStream sourceStream, long length, BlobRequestOptions options, OperationContext opContext) throws IOException, StorageException { - ((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, null, options, opContext); + ((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, accessCondition, options, opContext); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java index a52fdb7..fc8796b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java @@ -24,6 +24,7 @@ import java.io.OutputStream; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; +import org.apache.hadoop.classification.InterfaceAudience; /** * Support the Syncable interface on top of a DataOutputStream. @@ -38,6 +39,16 @@ public class SyncableDataOutputStream extends DataOutputStream super(out); } + /** + * Get a reference to the wrapped output stream. + * + * @return the underlying output stream + */ + @InterfaceAudience.LimitedPrivate({"HDFS"}) + public OutputStream getOutStream() { + return out; + } + @Override public boolean hasCapability(String capability) { if (out instanceof StreamCapabilities) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/site/markdown/index.md ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md b/hadoop-tools/hadoop-azure/src/site/markdown/index.md index 758650d..466bf0b 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md @@ -153,6 +153,40 @@ line argument: ``` +### Block Blob with Compaction Support and Configuration + +Block blobs are the default kind of blob and are good for most big-data use +cases. However, block blobs have strict limit of 50,000 blocks per blob. +To prevent reaching the limit WASB, by default, does not upload new block to +the service after every `hflush()` or `hsync()`. + +For most of the cases, combining data from multiple `write()` calls in +blocks of 4Mb is a good optimization. But, in others cases, like HBase log files, +every call to `hflush()` or `hsync()` must upload the data to the service. + +Block blobs with compaction upload the data to the cloud service after every +`hflush()`/`hsync()`. To mitigate the limit of 50000 blocks, `hflush() +`/`hsync()` runs once compaction process, if number of blocks in the blob +is above 32,000. + +Block compaction search and replaces a sequence of small blocks with one big +block. That means there is associated cost with block compaction: reading +small blocks back to the client and writing it again as one big block. + +In order to have the files you create be block blobs with block compaction +enabled, the client must set the configuration variable +`fs.azure.block.blob.with.compaction.dir` to a comma-separated list of +folder names. + +For example: + +```xml +<property> + <name>fs.azure.block.blob.with.compaction.dir</name> + <value>/hbase/WALs,/data/myblobfiles</value> +</property> +``` + ### Page Blob Support and Configuration The Azure Blob Storage interface for Hadoop supports two kinds of blobs, http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java index 4f26d9f..e0ae7b4 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java @@ -551,7 +551,8 @@ public class MockStorageInterface extends StorageInterface { throw new UnsupportedOperationException("downloadBlockList not used in Mock Tests"); } @Override - public void uploadBlock(String blockId, InputStream sourceStream, + public void uploadBlock(String blockId, AccessCondition accessCondition, + InputStream sourceStream, long length, BlobRequestOptions options, OperationContext opContext) throws IOException, StorageException { throw new UnsupportedOperationException("uploadBlock not used in Mock Tests"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java index 7ea7534..a10a366 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java @@ -107,7 +107,8 @@ public class TestAzureConcurrentOutOfBandIo { // outputStream = writerStorageAccount.getStore().storefile( key, - new PermissionStatus("", "", FsPermission.getDefault())); + new PermissionStatus("", "", FsPermission.getDefault()), + key); Arrays.fill(dataBlockWrite, (byte) (i % 256)); for (int j = 0; j < NUMBER_OF_BLOCKS; j++) { @@ -141,7 +142,8 @@ public class TestAzureConcurrentOutOfBandIo { // reading. This eliminates the race between the reader and writer threads. OutputStream outputStream = testAccount.getStore().storefile( "WASB_String.txt", - new PermissionStatus("", "", FsPermission.getDefault())); + new PermissionStatus("", "", FsPermission.getDefault()), + "WASB_String.txt"); Arrays.fill(dataBlockWrite, (byte) 255); for (int i = 0; i < NUMBER_OF_BLOCKS; i++) { outputStream.write(dataBlockWrite); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org