HADOOP-13354. Update WASB driver to use the latest version (4.2.0) of SDK for 
Microsoft Azure Storage Clients. Contributed by Sivaguru Sankaridurg.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b43de800
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b43de800
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b43de800

Branch: refs/heads/HADOOP-12756
Commit: b43de80031d1272e8a08ea5bd31027efe45e9d70
Parents: eb7ff0c
Author: Chris Nauroth <cnaur...@apache.org>
Authored: Wed Jul 27 15:50:28 2016 -0700
Committer: Chris Nauroth <cnaur...@apache.org>
Committed: Wed Jul 27 15:50:38 2016 -0700

----------------------------------------------------------------------
 hadoop-project/pom.xml                          |  2 +-
 .../hadoop/fs/azure/BlockBlobAppendStream.java  | 99 ++++++++++++++++----
 .../hadoop/fs/azure/SendRequestIntercept.java   |  2 +-
 .../hadoop/fs/azure/StorageInterfaceImpl.java   |  2 +-
 4 files changed, 85 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b43de800/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 318573a..dee79f7 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -996,7 +996,7 @@
       <dependency>
         <groupId>com.microsoft.azure</groupId>
         <artifactId>azure-storage</artifactId>
-        <version>2.2.0</version>
+        <version>4.2.0</version>
      </dependency>
 
      <dependency>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b43de800/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
index d1ec8df..e419a3b 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
@@ -28,6 +28,7 @@ import java.util.Calendar;
 import java.util.HashMap;
 import java.util.Locale;
 import java.util.List;
+import java.util.UUID;
 import java.util.Random;
 import java.util.TimeZone;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -98,6 +99,12 @@ public class BlockBlobAppendStream extends OutputStream {
    */
   private long nextBlockCount = UNSET_BLOCKS_COUNT;
 
+  /**
+   * Variable to hold the block id prefix to be used for azure
+   * storage blocks from azure-storage-java sdk version 4.2.0 onwards
+   */
+  private String blockIdPrefix = null;
+
   private final Random sequenceGenerator = new Random();
 
   /**
@@ -180,7 +187,8 @@ public class BlockBlobAppendStream extends OutputStream {
     this.key = aKey;
     this.bufferSize = bufferSize;
     this.threadSequenceNumber = new AtomicInteger(0);
-    setBlocksCount();
+    this.blockIdPrefix = null;
+    setBlocksCountAndBlockIdPrefix();
 
     this.outBuffer = new ByteArrayOutputStream(bufferSize);
     this.uncommittedBlockEntries = new ArrayList<BlockEntry>();
@@ -433,22 +441,41 @@ public class BlockBlobAppendStream extends OutputStream {
    * Helper method used to generate the blockIDs. The algorithm used is 
similar to the Azure
    * storage SDK.
    */
-  private void setBlocksCount() throws IOException {
-    try {
+  private void setBlocksCountAndBlockIdPrefix() throws IOException {
 
-      if (nextBlockCount == UNSET_BLOCKS_COUNT) {
+    try {
 
-        nextBlockCount = (long) (sequenceGenerator.nextInt(Integer.MAX_VALUE))
-            + sequenceGenerator.nextInt(Integer.MAX_VALUE - MAX_BLOCK_COUNT);
+      if (nextBlockCount == UNSET_BLOCKS_COUNT && blockIdPrefix==null) {
 
         List<BlockEntry> blockEntries =
             blob.downloadBlockList(BlockListingFilter.COMMITTED, new 
BlobRequestOptions(), opContext);
 
-        nextBlockCount += blockEntries.size();
+        String blockZeroBlockId = (blockEntries.size() > 0) ? 
blockEntries.get(0).getId() : "";
+        String prefix = UUID.randomUUID().toString() + "-";
+        String sampleNewerVersionBlockId = generateNewerVersionBlockId(prefix, 
0);
+
+        if (blockEntries.size() > 0 && blockZeroBlockId.length() < 
sampleNewerVersionBlockId.length()) {
+
+          // If blob has already been created with 2.2.0, append subsequent 
blocks with older version (2.2.0) blockId
+          // compute nextBlockCount, the way it was done before; and don't use 
blockIdPrefix
+          this.blockIdPrefix = "";
+          nextBlockCount = (long) 
(sequenceGenerator.nextInt(Integer.MAX_VALUE))
+              + sequenceGenerator.nextInt(Integer.MAX_VALUE - MAX_BLOCK_COUNT);
+          nextBlockCount += blockEntries.size();
+
+        } else {
+
+          // If there are no existing blocks, create the first block with 
newer version (4.2.0) blockId
+          // If blob has already been created with 4.2.0, append subsequent 
blocks with newer version (4.2.0) blockId
+          this.blockIdPrefix = prefix;
+          nextBlockCount = blockEntries.size();
+
+        }
 
       }
+
     } catch (StorageException ex) {
-      LOG.debug("Encountered storage exception during setting next Block 
Count."
+      LOG.debug("Encountered storage exception during setting next Block Count 
and BlockId prefix."
           + " StorageException : {} ErrorCode : {}", ex, ex.getErrorCode());
       throw new IOException(ex);
     }
@@ -465,7 +492,40 @@ public class BlockBlobAppendStream extends OutputStream {
       throw new IOException("Append Stream in invalid state. nextBlockCount 
not set correctly");
     }
 
-    byte[] blockIdInBytes = getBytesFromLong(nextBlockCount);
+    if (this.blockIdPrefix == null) {
+      throw new IOException("Append Stream in invalid state. blockIdPrefix not 
set correctly");
+    }
+
+    if (!this.blockIdPrefix.equals("")) {
+
+      return generateNewerVersionBlockId(this.blockIdPrefix, nextBlockCount++);
+
+    } else {
+
+      return generateOlderVersionBlockId(nextBlockCount++);
+
+    }
+
+  }
+
+  /**
+   * Helper method that generates an older (2.2.0) version blockId
+   * @return String representing the block ID generated.
+   */
+  private String generateOlderVersionBlockId(long id) {
+
+    byte[] blockIdInBytes = getBytesFromLong(id);
+    return new String(Base64.encodeBase64(blockIdInBytes), 
StandardCharsets.UTF_8);
+  }
+
+  /**
+   * Helper method that generates an newer (4.2.0) version blockId
+   * @return String representing the block ID generated.
+   */
+  private String generateNewerVersionBlockId(String prefix, long id) {
+
+    String blockIdSuffix  = String.format("%06d", id);
+    byte[] blockIdInBytes = (prefix + 
blockIdSuffix).getBytes(StandardCharsets.UTF_8);
     return new String(Base64.encodeBase64(blockIdInBytes), 
StandardCharsets.UTF_8);
   }
 
@@ -481,28 +541,33 @@ public class BlockBlobAppendStream extends OutputStream {
    * @return A byte array that represents the data of the specified 
<code>long</code> value.
    */
   private static byte[] getBytesFromLong(final long value) {
-      final byte[] tempArray = new byte[8];
 
-      for (int m = 0; m < 8; m++) {
-          tempArray[7 - m] = (byte) ((value >> (8 * m)) & 0xFF);
-      }
+    final byte[] tempArray = new byte[8];
 
-      return tempArray;
+    for (int m = 0; m < 8; m++) {
+      tempArray[7 - m] = (byte) ((value >> (8 * m)) & 0xFF);
+    }
+
+    return tempArray;
   }
+
   /**
    * Helper method that creates a thread to upload a block to azure storage.
    * @param payload
    * @throws IOException
    */
-  private synchronized void uploadBlockToStorage(byte[] payload) throws 
IOException {
+  private synchronized void uploadBlockToStorage(byte[] payload)
+      throws IOException {
 
     // upload payload to azure storage
-    nextBlockCount++;
     String blockId = generateBlockId();
+
     // Since uploads of the Azure storage are done in parallel threads, we go 
ahead
     // add the blockId in the uncommitted list. If the upload of the block 
fails
     // we don't commit the blockIds.
-    uncommittedBlockEntries.add(new BlockEntry(blockId));
+    BlockEntry blockEntry = new BlockEntry(blockId);
+    blockEntry.setSize(payload.length);
+    uncommittedBlockEntries.add(blockEntry);
     ioThreadPool.execute(new WriteRequest(payload, blockId));
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b43de800/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SendRequestIntercept.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SendRequestIntercept.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SendRequestIntercept.java
index 4d564d5..f86f392 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SendRequestIntercept.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SendRequestIntercept.java
@@ -147,7 +147,7 @@ public final class SendRequestIntercept extends 
StorageEvent<SendingRequestEvent
       try {
         // Sign the request. GET's have no payload so the content length is
         // zero.
-        StorageCredentialsHelper.signBlobAndQueueRequest(getCredentials(),
+        StorageCredentialsHelper.signBlobQueueAndFileRequest(getCredentials(),
           urlConnection, -1L, getOperationContext());
       } catch (InvalidKeyException e) {
         // Log invalid key exception to track signing error before the send

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b43de800/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
index 298f3aa..367cd04 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
@@ -404,7 +404,7 @@ class StorageInterfaceImpl extends StorageInterface {
     public void startCopyFromBlob(CloudBlobWrapper sourceBlob, 
BlobRequestOptions options,
         OperationContext opContext)
             throws StorageException, URISyntaxException {
-      getBlob().startCopyFromBlob(((CloudBlobWrapperImpl) sourceBlob).blob,
+      getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(),
           null, null, options, opContext);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to