This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f71995773df [feature](Azure) Support copy into on Azure Blob Storage 
(#36554)
f71995773df is described below

commit f71995773df55424e6a45b784b61309aed836685
Author: AlexYue <[email protected]>
AuthorDate: Mon Jun 24 21:58:35 2024 +0800

    [feature](Azure) Support copy into on Azure Blob Storage (#36554)
---
 .../java/org/apache/doris/analysis/CopyStmt.java   |   1 +
 .../apache/doris/cloud/storage/AzureRemote.java    | 255 +++++++++++++++++++++
 .../org/apache/doris/cloud/storage/BosRemote.java  |   3 +-
 .../org/apache/doris/cloud/storage/CosRemote.java  |   2 +-
 .../org/apache/doris/cloud/storage/ObsRemote.java  |   4 +-
 .../org/apache/doris/cloud/storage/OssRemote.java  |   2 +-
 .../org/apache/doris/cloud/storage/RemoteBase.java |   4 +
 .../org/apache/doris/cloud/storage/S3Remote.java   |   2 +-
 8 files changed, 267 insertions(+), 6 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java
index 564d332d321..80ba68ac575 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java
@@ -205,6 +205,7 @@ public class CopyStmt extends DdlStmt {
         }
         brokerProperties.put(S3_BUCKET, objInfo.getBucket());
         brokerProperties.put(S3_PREFIX, objInfo.getPrefix());
+        brokerProperties.put(S3Properties.PROVIDER, 
objInfo.getProvider().toString());
         StageProperties stageProperties = new 
StageProperties(stagePB.getPropertiesMap());
         this.copyIntoProperties.mergeProperties(stageProperties);
         this.copyIntoProperties.analyze();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/AzureRemote.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/AzureRemote.java
new file mode 100644
index 00000000000..0dc0cf60019
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/AzureRemote.java
@@ -0,0 +1,255 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.storage;
+
+import org.apache.doris.common.DdlException;
+
+import com.azure.core.credential.AccessToken;
+import com.azure.core.credential.TokenCredential;
+import com.azure.core.credential.TokenRequestContext;
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.core.http.rest.PagedResponse;
+import com.azure.core.http.rest.Response;
+import com.azure.core.util.Context;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobContainerClientBuilder;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.batch.BlobBatch;
+import com.azure.storage.blob.batch.BlobBatchClient;
+import com.azure.storage.blob.batch.BlobBatchClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobProperties;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.azure.storage.blob.models.UserDelegationKey;
+import com.azure.storage.blob.sas.BlobContainerSasPermission;
+import com.azure.storage.blob.sas.BlobSasPermission;
+import com.azure.storage.blob.sas.BlobServiceSasSignatureValues;
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.common.sas.SasProtocol;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.http.HttpStatus;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import reactor.core.publisher.Mono;
+
+import java.time.OffsetDateTime;
+import java.util.ArrayList;
+import java.util.List;
+
+public class AzureRemote extends RemoteBase {
+
+    private static final Logger LOG = LogManager.getLogger(AzureRemote.class);
+
+    private static final String URI_TEMPLATE = 
"https://%s.blob.core.windows.net/%s";;
+
+    private BlobContainerClient client;
+
+    public AzureRemote(ObjectInfo obj) {
+        super(obj);
+    }
+
+    @Override
+    public String getPresignedUrl(String fileName) {
+        try {
+            BlobContainerClientBuilder builder = new 
BlobContainerClientBuilder();
+            builder.credential(new StorageSharedKeyCredential(obj.getAk(), 
obj.getSk()));
+            String containerName = obj.getBucket();
+            String uri = String.format(URI_TEMPLATE, obj.getAk(),
+                    containerName);
+            builder.endpoint(uri);
+            BlobContainerClient containerClient = builder.buildClient();
+
+            BlobClient blobClient = 
containerClient.getBlobClient(normalizePrefix(fileName));
+
+            OffsetDateTime expiryTime = 
OffsetDateTime.now().plusSeconds(SESSION_EXPIRE_SECOND);
+            BlobSasPermission permission = new BlobSasPermission()
+                    .setReadPermission(true)
+                    .setWritePermission(true)
+                    .setDeletePermission(true);
+
+            BlobServiceSasSignatureValues sasValues = new 
BlobServiceSasSignatureValues(expiryTime, permission)
+                    .setProtocol(SasProtocol.HTTPS_ONLY)
+                    .setStartTime(OffsetDateTime.now());
+
+            String sasToken = blobClient.generateSas(sasValues);
+            return blobClient.getBlobUrl() + "?" + sasToken;
+        } catch (Exception e) {
+            e.getStackTrace();
+        }
+        return "";
+    }
+
+    @Override
+    public ListObjectsResult listObjects(String continuationToken) throws 
DdlException {
+        return listObjectsInner(normalizePrefix(), continuationToken);
+    }
+
+    @Override
+    public ListObjectsResult listObjects(String subPrefix, String 
continuationToken) throws DdlException {
+        return listObjectsInner(normalizePrefix(subPrefix), continuationToken);
+    }
+
+    @Override
+    public ListObjectsResult headObject(String subKey) throws DdlException {
+        initClient();
+        try {
+            String key = normalizePrefix(subKey);
+            BlobClient blobClient = client.getBlobClient(key);
+            BlobProperties properties = blobClient.getProperties();
+            ObjectFile objectFile = new ObjectFile(key, getRelativePath(key), 
properties.getETag(),
+                    properties.getBlobSize());
+            return new ListObjectsResult(Lists.newArrayList(objectFile), 
false, null);
+        } catch (BlobStorageException e) {
+            if (e.getStatusCode() == HttpStatus.SC_NOT_FOUND) {
+                LOG.warn("NoSuchKey when head object for Azure, subKey={}", 
subKey);
+                return new ListObjectsResult(Lists.newArrayList(), false, 
null);
+            }
+            LOG.warn("Failed to head object for Azure, subKey={}", subKey, e);
+            throw new DdlException(
+                    "Failed to head object for Azure, subKey=" + subKey + " 
Error message=" + e.getMessage());
+        }
+    }
+
+    @Override
+    public Triple<String, String, String> getStsToken() throws DdlException {
+        try {
+            BlobContainerClientBuilder builder = new 
BlobContainerClientBuilder();
+            builder.credential(new StorageSharedKeyCredential(obj.getAk(), 
obj.getSk()));
+            String containerName = obj.getBucket();
+            String uri = String.format(URI_TEMPLATE, obj.getAk(),
+                    containerName);
+            builder.endpoint(uri);
+            BlobContainerClient containerClient = builder.buildClient();
+            BlobServiceClient blobServiceClient = 
containerClient.getServiceClient();
+
+            OffsetDateTime keyStart = OffsetDateTime.now();
+            OffsetDateTime keyExpiry = 
keyStart.plusSeconds(SESSION_EXPIRE_SECOND);
+            UserDelegationKey userDelegationKey = 
blobServiceClient.getUserDelegationKey(keyStart, keyExpiry);
+
+            OffsetDateTime expiryTime = 
OffsetDateTime.now().plusSeconds(SESSION_EXPIRE_SECOND);
+            BlobContainerSasPermission permission = new 
BlobContainerSasPermission()
+                    .setReadPermission(true)
+                    .setWritePermission(true)
+                    .setListPermission(true);
+
+            BlobServiceSasSignatureValues sasValues = new 
BlobServiceSasSignatureValues(expiryTime, permission)
+                    .setProtocol(SasProtocol.HTTPS_ONLY)
+                    .setStartTime(OffsetDateTime.now());
+
+            String sasToken = 
containerClient.generateUserDelegationSas(sasValues, userDelegationKey);
+            return Triple.of(obj.getAk(), obj.getSk(), sasToken);
+        } catch (Throwable e) {
+            LOG.warn("Failed get Azure sts token", e);
+            throw new DdlException(e.getMessage());
+        }
+    }
+
+    @Override
+    public void deleteObjects(List<String> keys) throws DdlException {
+        checkDeleteKeys(keys);
+        initClient();
+
+        try {
+            BlobBatchClient blobBatchClient = new BlobBatchClientBuilder(
+                    client.getServiceClient()).buildClient();
+            int maxDelete = 1000;
+            for (int i = 0; i < keys.size() / maxDelete + 1; i++) {
+                int cnt = 0;
+                BlobBatch batch = blobBatchClient.getBlobBatch();
+                for (int j = maxDelete * i; j < keys.size() && cnt < 
maxDelete; j++) {
+                    batch.deleteBlob(client.getBlobContainerName(), 
keys.get(j));
+                    cnt++;
+                }
+                Response<Void> resp = 
blobBatchClient.submitBatchWithResponse(batch, true, null, Context.NONE);
+                if (resp.getStatusCode() != HttpStatus.SC_OK) {
+                    throw new DdlException(
+                            "Failed delete objects, bucket=" + 
obj.getBucket());
+                }
+            }
+        } catch (BlobStorageException e) {
+            LOG.warn("Failed to delete objects for Azure", e);
+            throw new DdlException("Failed to delete objects for Azure, Error 
message=" + e.getMessage());
+        }
+    }
+
+    @Override
+    public void close() {
+        client = null;
+    }
+
+    @Override
+    public String toString() {
+        return "AzureRemote{obj=" + obj + '}';
+    }
+
+    private ListObjectsResult listObjectsInner(String prefix, String 
continuationToken) throws DdlException {
+        initClient();
+        try {
+            ListBlobsOptions options = new 
ListBlobsOptions().setPrefix(prefix);
+            PagedIterable<BlobItem> pagedBlobs = client.listBlobs(options, 
continuationToken, null);
+            PagedResponse<BlobItem> pagedResponse = 
pagedBlobs.iterableByPage().iterator().next();
+            List<ObjectFile> objectFiles = new ArrayList<>();
+
+            for (BlobItem blobItem : pagedResponse.getElements()) {
+                objectFiles.add(new ObjectFile(blobItem.getName(), 
getRelativePath(blobItem.getName()),
+                        blobItem.getProperties().getETag(), 
blobItem.getProperties().getContentLength()));
+            }
+            return new ListObjectsResult(objectFiles, 
pagedResponse.getContinuationToken() == null,
+                    pagedResponse.getContinuationToken());
+        } catch (BlobStorageException e) {
+            LOG.warn("Failed to list objects for Azure", e);
+            throw new DdlException("Failed to list objects for Azure, Error 
message=" + e.getMessage());
+        }
+    }
+
+    private void initClient() {
+        if (client == null) {
+            BlobContainerClientBuilder builder = new 
BlobContainerClientBuilder();
+            if (obj.getToken() != null) {
+                builder.credential(new SimpleTokenCredential(obj.getToken()));
+            } else {
+                builder.credential(new StorageSharedKeyCredential(obj.getAk(), 
obj.getSk()));
+            }
+            String containerName = obj.getBucket();
+            String uri = String.format(URI_TEMPLATE, obj.getAk(),
+                    containerName);
+            builder.endpoint(uri);
+            client = builder.buildClient();
+        }
+    }
+
+    // Custom implementation of TokenCredential
+    private static class SimpleTokenCredential implements TokenCredential {
+        private static final Logger LOG = 
LogManager.getLogger(SimpleTokenCredential.class);
+        private final String token;
+
+        SimpleTokenCredential(String token) {
+            this.token = token;
+        }
+
+        @Override
+        public Mono<AccessToken> getToken(TokenRequestContext request) {
+            LOG.info("Getting token for scopes: {}", String.join(", ", 
request.getScopes()));
+            // Assume the token is valid for 1 hour from the current time
+            return Mono.just(new AccessToken(token, 
OffsetDateTime.now().plusSeconds(SESSION_EXPIRE_SECOND)));
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/BosRemote.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/BosRemote.java
index c55eb9031fa..b3da80e9bb6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/BosRemote.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/BosRemote.java
@@ -42,7 +42,8 @@ public class BosRemote extends DefaultRemote {
         config.setCredentials(new DefaultBceCredentials(obj.getAk(), 
obj.getSk()));
         config.setEndpoint(obj.getEndpoint());
         BosClient client = new BosClient(config);
-        URL url = client.generatePresignedUrl(obj.getBucket(), 
normalizePrefix(fileName), 3600, HttpMethodName.PUT);
+        URL url = client.generatePresignedUrl(obj.getBucket(), 
normalizePrefix(fileName),
+                (int) SESSION_EXPIRE_SECOND, HttpMethodName.PUT);
         LOG.info("Bos getPresignedUrl: {}", url);
         return url.toString();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/CosRemote.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/CosRemote.java
index 7d1aa27b7d9..e541014e78e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/CosRemote.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/CosRemote.java
@@ -56,7 +56,7 @@ public class CosRemote extends DefaultRemote {
         clientConfig.setRegion(new Region(obj.getRegion()));
         clientConfig.setHttpProtocol(HttpProtocol.https);
         COSClient cosClient = new COSClient(cred, clientConfig);
-        Date expirationDate = new Date(System.currentTimeMillis() + 60 * 60 * 
1000);
+        Date expirationDate = new Date(System.currentTimeMillis() + 
SESSION_EXPIRE_SECOND);
         URL url = cosClient.generatePresignedUrl(obj.getBucket(),
                 normalizePrefix(fileName), expirationDate, HttpMethodName.PUT,
                 new HashMap<String, String>(), new HashMap<String, String>());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/ObsRemote.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/ObsRemote.java
index 6f8e332f6f4..053066c11bc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/ObsRemote.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/ObsRemote.java
@@ -55,8 +55,8 @@ public class ObsRemote extends DefaultRemote {
         String sk = obj.getSk();
 
         ObsClient obsClient = new ObsClient(ak, sk, endPoint);
-        long expireSeconds = 3600L;
-        TemporarySignatureRequest request = new 
TemporarySignatureRequest(HttpMethodEnum.PUT, expireSeconds);
+        TemporarySignatureRequest request = new TemporarySignatureRequest(
+                HttpMethodEnum.PUT, SESSION_EXPIRE_SECOND);
         request.setBucketName(obj.getBucket());
         request.setObjectKey(normalizePrefix(fileName));
         request.setHeaders(new HashMap<String, String>());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/OssRemote.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/OssRemote.java
index 139d2bbd415..42e019c77ba 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/OssRemote.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/OssRemote.java
@@ -65,7 +65,7 @@ public class OssRemote extends DefaultRemote {
         try {
             GeneratePresignedUrlRequest request
                     = new GeneratePresignedUrlRequest(bucketName, objectName, 
HttpMethod.PUT);
-            Date expiration = new Date(new Date().getTime() + 3600 * 1000);
+            Date expiration = new Date(new Date().getTime() + 
SESSION_EXPIRE_SECOND * 1000);
             request.setExpiration(expiration);
             URL signedUrl = ossClient.generatePresignedUrl(request);
             return signedUrl.toString();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/RemoteBase.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/RemoteBase.java
index e146e52f534..b15c9fb35a8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/RemoteBase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/RemoteBase.java
@@ -116,6 +116,8 @@ public abstract class RemoteBase {
 
     public ObjectInfo obj;
 
+    protected static long SESSION_EXPIRE_SECOND = 3600;
+
     public RemoteBase(ObjectInfo obj) {
         this.obj = obj;
     }
@@ -149,6 +151,8 @@ public abstract class RemoteBase {
                 return new ObsRemote(obj);
             case BOS:
                 return new BosRemote(obj);
+            case AZURE:
+                return new AzureRemote(obj);
             default:
                 throw new Exception("current not support obj : " + 
obj.toString());
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/S3Remote.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/S3Remote.java
index 49d82ab5ae2..35e9dc7b39a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/S3Remote.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/S3Remote.java
@@ -54,7 +54,7 @@ public class S3Remote extends DefaultRemote {
                     .build();
 
             PutObjectPresignRequest presignRequest = 
PutObjectPresignRequest.builder()
-                    .signatureDuration(Duration.ofMinutes(60))
+                    
.signatureDuration(Duration.ofSeconds(SESSION_EXPIRE_SECOND))
                     .putObjectRequest(objectRequest)
                     .build();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to