HADOOP-13354. Update WASB driver to use the latest version (4.2.0) of SDK for Microsoft Azure Storage Clients. Contributed by Sivaguru Sankaridurg.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b43de800 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b43de800 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b43de800 Branch: refs/heads/HADOOP-12756 Commit: b43de80031d1272e8a08ea5bd31027efe45e9d70 Parents: eb7ff0c Author: Chris Nauroth <cnaur...@apache.org> Authored: Wed Jul 27 15:50:28 2016 -0700 Committer: Chris Nauroth <cnaur...@apache.org> Committed: Wed Jul 27 15:50:38 2016 -0700 ---------------------------------------------------------------------- hadoop-project/pom.xml | 2 +- .../hadoop/fs/azure/BlockBlobAppendStream.java | 99 ++++++++++++++++---- .../hadoop/fs/azure/SendRequestIntercept.java | 2 +- .../hadoop/fs/azure/StorageInterfaceImpl.java | 2 +- 4 files changed, 85 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b43de800/hadoop-project/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 318573a..dee79f7 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -996,7 +996,7 @@ <dependency> <groupId>com.microsoft.azure</groupId> <artifactId>azure-storage</artifactId> - <version>2.2.0</version> + <version>4.2.0</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/hadoop/blob/b43de800/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java index d1ec8df..e419a3b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java @@ -28,6 +28,7 @@ import java.util.Calendar; import java.util.HashMap; import java.util.Locale; import java.util.List; +import java.util.UUID; import java.util.Random; import java.util.TimeZone; import java.util.concurrent.LinkedBlockingQueue; @@ -98,6 +99,12 @@ public class BlockBlobAppendStream extends OutputStream { */ private long nextBlockCount = UNSET_BLOCKS_COUNT; + /** + * Variable to hold the block id prefix to be used for azure + * storage blocks from azure-storage-java sdk version 4.2.0 onwards + */ + private String blockIdPrefix = null; + private final Random sequenceGenerator = new Random(); /** @@ -180,7 +187,8 @@ public class BlockBlobAppendStream extends OutputStream { this.key = aKey; this.bufferSize = bufferSize; this.threadSequenceNumber = new AtomicInteger(0); - setBlocksCount(); + this.blockIdPrefix = null; + setBlocksCountAndBlockIdPrefix(); this.outBuffer = new ByteArrayOutputStream(bufferSize); this.uncommittedBlockEntries = new ArrayList<BlockEntry>(); @@ -433,22 +441,41 @@ public class BlockBlobAppendStream extends OutputStream { * Helper method used to generate the blockIDs. The algorithm used is similar to the Azure * storage SDK. */ - private void setBlocksCount() throws IOException { - try { + private void setBlocksCountAndBlockIdPrefix() throws IOException { - if (nextBlockCount == UNSET_BLOCKS_COUNT) { + try { - nextBlockCount = (long) (sequenceGenerator.nextInt(Integer.MAX_VALUE)) - + sequenceGenerator.nextInt(Integer.MAX_VALUE - MAX_BLOCK_COUNT); + if (nextBlockCount == UNSET_BLOCKS_COUNT && blockIdPrefix==null) { List<BlockEntry> blockEntries = blob.downloadBlockList(BlockListingFilter.COMMITTED, new BlobRequestOptions(), opContext); - nextBlockCount += blockEntries.size(); + String blockZeroBlockId = (blockEntries.size() > 0) ? blockEntries.get(0).getId() : ""; + String prefix = UUID.randomUUID().toString() + "-"; + String sampleNewerVersionBlockId = generateNewerVersionBlockId(prefix, 0); + + if (blockEntries.size() > 0 && blockZeroBlockId.length() < sampleNewerVersionBlockId.length()) { + + // If blob has already been created with 2.2.0, append subsequent blocks with older version (2.2.0) blockId + // compute nextBlockCount, the way it was done before; and don't use blockIdPrefix + this.blockIdPrefix = ""; + nextBlockCount = (long) (sequenceGenerator.nextInt(Integer.MAX_VALUE)) + + sequenceGenerator.nextInt(Integer.MAX_VALUE - MAX_BLOCK_COUNT); + nextBlockCount += blockEntries.size(); + + } else { + + // If there are no existing blocks, create the first block with newer version (4.2.0) blockId + // If blob has already been created with 4.2.0, append subsequent blocks with newer version (4.2.0) blockId + this.blockIdPrefix = prefix; + nextBlockCount = blockEntries.size(); + + } } + } catch (StorageException ex) { - LOG.debug("Encountered storage exception during setting next Block Count." + LOG.debug("Encountered storage exception during setting next Block Count and BlockId prefix." + " StorageException : {} ErrorCode : {}", ex, ex.getErrorCode()); throw new IOException(ex); } @@ -465,7 +492,40 @@ public class BlockBlobAppendStream extends OutputStream { throw new IOException("Append Stream in invalid state. nextBlockCount not set correctly"); } - byte[] blockIdInBytes = getBytesFromLong(nextBlockCount); + if (this.blockIdPrefix == null) { + throw new IOException("Append Stream in invalid state. blockIdPrefix not set correctly"); + } + + if (!this.blockIdPrefix.equals("")) { + + return generateNewerVersionBlockId(this.blockIdPrefix, nextBlockCount++); + + } else { + + return generateOlderVersionBlockId(nextBlockCount++); + + } + + } + + /** + * Helper method that generates an older (2.2.0) version blockId + * @return String representing the block ID generated. + */ + private String generateOlderVersionBlockId(long id) { + + byte[] blockIdInBytes = getBytesFromLong(id); + return new String(Base64.encodeBase64(blockIdInBytes), StandardCharsets.UTF_8); + } + + /** + * Helper method that generates an newer (4.2.0) version blockId + * @return String representing the block ID generated. + */ + private String generateNewerVersionBlockId(String prefix, long id) { + + String blockIdSuffix = String.format("%06d", id); + byte[] blockIdInBytes = (prefix + blockIdSuffix).getBytes(StandardCharsets.UTF_8); return new String(Base64.encodeBase64(blockIdInBytes), StandardCharsets.UTF_8); } @@ -481,28 +541,33 @@ public class BlockBlobAppendStream extends OutputStream { * @return A byte array that represents the data of the specified <code>long</code> value. */ private static byte[] getBytesFromLong(final long value) { - final byte[] tempArray = new byte[8]; - for (int m = 0; m < 8; m++) { - tempArray[7 - m] = (byte) ((value >> (8 * m)) & 0xFF); - } + final byte[] tempArray = new byte[8]; - return tempArray; + for (int m = 0; m < 8; m++) { + tempArray[7 - m] = (byte) ((value >> (8 * m)) & 0xFF); + } + + return tempArray; } + /** * Helper method that creates a thread to upload a block to azure storage. * @param payload * @throws IOException */ - private synchronized void uploadBlockToStorage(byte[] payload) throws IOException { + private synchronized void uploadBlockToStorage(byte[] payload) + throws IOException { // upload payload to azure storage - nextBlockCount++; String blockId = generateBlockId(); + // Since uploads of the Azure storage are done in parallel threads, we go ahead // add the blockId in the uncommitted list. If the upload of the block fails // we don't commit the blockIds. - uncommittedBlockEntries.add(new BlockEntry(blockId)); + BlockEntry blockEntry = new BlockEntry(blockId); + blockEntry.setSize(payload.length); + uncommittedBlockEntries.add(blockEntry); ioThreadPool.execute(new WriteRequest(payload, blockId)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b43de800/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SendRequestIntercept.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SendRequestIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SendRequestIntercept.java index 4d564d5..f86f392 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SendRequestIntercept.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SendRequestIntercept.java @@ -147,7 +147,7 @@ public final class SendRequestIntercept extends StorageEvent<SendingRequestEvent try { // Sign the request. GET's have no payload so the content length is // zero. - StorageCredentialsHelper.signBlobAndQueueRequest(getCredentials(), + StorageCredentialsHelper.signBlobQueueAndFileRequest(getCredentials(), urlConnection, -1L, getOperationContext()); } catch (InvalidKeyException e) { // Log invalid key exception to track signing error before the send http://git-wip-us.apache.org/repos/asf/hadoop/blob/b43de800/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java index 298f3aa..367cd04 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java @@ -404,7 +404,7 @@ class StorageInterfaceImpl extends StorageInterface { public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options, OperationContext opContext) throws StorageException, URISyntaxException { - getBlob().startCopyFromBlob(((CloudBlobWrapperImpl) sourceBlob).blob, + getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(), null, null, options, opContext); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org