This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f71995773df [feature](Azure) Support copy into on Azure Blob Storage
(#36554)
f71995773df is described below
commit f71995773df55424e6a45b784b61309aed836685
Author: AlexYue <[email protected]>
AuthorDate: Mon Jun 24 21:58:35 2024 +0800
[feature](Azure) Support copy into on Azure Blob Storage (#36554)
---
.../java/org/apache/doris/analysis/CopyStmt.java | 1 +
.../apache/doris/cloud/storage/AzureRemote.java | 255 +++++++++++++++++++++
.../org/apache/doris/cloud/storage/BosRemote.java | 3 +-
.../org/apache/doris/cloud/storage/CosRemote.java | 2 +-
.../org/apache/doris/cloud/storage/ObsRemote.java | 4 +-
.../org/apache/doris/cloud/storage/OssRemote.java | 2 +-
.../org/apache/doris/cloud/storage/RemoteBase.java | 4 +
.../org/apache/doris/cloud/storage/S3Remote.java | 2 +-
8 files changed, 267 insertions(+), 6 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java
index 564d332d321..80ba68ac575 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java
@@ -205,6 +205,7 @@ public class CopyStmt extends DdlStmt {
}
brokerProperties.put(S3_BUCKET, objInfo.getBucket());
brokerProperties.put(S3_PREFIX, objInfo.getPrefix());
+ brokerProperties.put(S3Properties.PROVIDER,
objInfo.getProvider().toString());
StageProperties stageProperties = new
StageProperties(stagePB.getPropertiesMap());
this.copyIntoProperties.mergeProperties(stageProperties);
this.copyIntoProperties.analyze();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/AzureRemote.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/AzureRemote.java
new file mode 100644
index 00000000000..0dc0cf60019
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/AzureRemote.java
@@ -0,0 +1,255 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.storage;
+
+import org.apache.doris.common.DdlException;
+
+import com.azure.core.credential.AccessToken;
+import com.azure.core.credential.TokenCredential;
+import com.azure.core.credential.TokenRequestContext;
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.core.http.rest.PagedResponse;
+import com.azure.core.http.rest.Response;
+import com.azure.core.util.Context;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobContainerClientBuilder;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.batch.BlobBatch;
+import com.azure.storage.blob.batch.BlobBatchClient;
+import com.azure.storage.blob.batch.BlobBatchClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobProperties;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.azure.storage.blob.models.UserDelegationKey;
+import com.azure.storage.blob.sas.BlobContainerSasPermission;
+import com.azure.storage.blob.sas.BlobSasPermission;
+import com.azure.storage.blob.sas.BlobServiceSasSignatureValues;
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.common.sas.SasProtocol;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.http.HttpStatus;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import reactor.core.publisher.Mono;
+
+import java.time.OffsetDateTime;
+import java.util.ArrayList;
+import java.util.List;
+
+public class AzureRemote extends RemoteBase {
+
+ private static final Logger LOG = LogManager.getLogger(AzureRemote.class);
+
+ private static final String URI_TEMPLATE =
"https://%s.blob.core.windows.net/%s";
+
+ private BlobContainerClient client;
+
+ public AzureRemote(ObjectInfo obj) {
+ super(obj);
+ }
+
+ @Override
+ public String getPresignedUrl(String fileName) {
+ try {
+ BlobContainerClientBuilder builder = new
BlobContainerClientBuilder();
+ builder.credential(new StorageSharedKeyCredential(obj.getAk(),
obj.getSk()));
+ String containerName = obj.getBucket();
+ String uri = String.format(URI_TEMPLATE, obj.getAk(),
+ containerName);
+ builder.endpoint(uri);
+ BlobContainerClient containerClient = builder.buildClient();
+
+ BlobClient blobClient =
containerClient.getBlobClient(normalizePrefix(fileName));
+
+ OffsetDateTime expiryTime =
OffsetDateTime.now().plusSeconds(SESSION_EXPIRE_SECOND);
+ BlobSasPermission permission = new BlobSasPermission()
+ .setReadPermission(true)
+ .setWritePermission(true)
+ .setDeletePermission(true);
+
+ BlobServiceSasSignatureValues sasValues = new
BlobServiceSasSignatureValues(expiryTime, permission)
+ .setProtocol(SasProtocol.HTTPS_ONLY)
+ .setStartTime(OffsetDateTime.now());
+
+ String sasToken = blobClient.generateSas(sasValues);
+ return blobClient.getBlobUrl() + "?" + sasToken;
+ } catch (Exception e) {
+ e.getStackTrace();
+ }
+ return "";
+ }
+
+ @Override
+ public ListObjectsResult listObjects(String continuationToken) throws
DdlException {
+ return listObjectsInner(normalizePrefix(), continuationToken);
+ }
+
+ @Override
+ public ListObjectsResult listObjects(String subPrefix, String
continuationToken) throws DdlException {
+ return listObjectsInner(normalizePrefix(subPrefix), continuationToken);
+ }
+
+ @Override
+ public ListObjectsResult headObject(String subKey) throws DdlException {
+ initClient();
+ try {
+ String key = normalizePrefix(subKey);
+ BlobClient blobClient = client.getBlobClient(key);
+ BlobProperties properties = blobClient.getProperties();
+ ObjectFile objectFile = new ObjectFile(key, getRelativePath(key),
properties.getETag(),
+ properties.getBlobSize());
+ return new ListObjectsResult(Lists.newArrayList(objectFile),
false, null);
+ } catch (BlobStorageException e) {
+ if (e.getStatusCode() == HttpStatus.SC_NOT_FOUND) {
+ LOG.warn("NoSuchKey when head object for Azure, subKey={}",
subKey);
+ return new ListObjectsResult(Lists.newArrayList(), false,
null);
+ }
+ LOG.warn("Failed to head object for Azure, subKey={}", subKey, e);
+ throw new DdlException(
+ "Failed to head object for Azure, subKey=" + subKey + "
Error message=" + e.getMessage());
+ }
+ }
+
+ @Override
+ public Triple<String, String, String> getStsToken() throws DdlException {
+ try {
+ BlobContainerClientBuilder builder = new
BlobContainerClientBuilder();
+ builder.credential(new StorageSharedKeyCredential(obj.getAk(),
obj.getSk()));
+ String containerName = obj.getBucket();
+ String uri = String.format(URI_TEMPLATE, obj.getAk(),
+ containerName);
+ builder.endpoint(uri);
+ BlobContainerClient containerClient = builder.buildClient();
+ BlobServiceClient blobServiceClient =
containerClient.getServiceClient();
+
+ OffsetDateTime keyStart = OffsetDateTime.now();
+ OffsetDateTime keyExpiry =
keyStart.plusSeconds(SESSION_EXPIRE_SECOND);
+ UserDelegationKey userDelegationKey =
blobServiceClient.getUserDelegationKey(keyStart, keyExpiry);
+
+ OffsetDateTime expiryTime =
OffsetDateTime.now().plusSeconds(SESSION_EXPIRE_SECOND);
+ BlobContainerSasPermission permission = new
BlobContainerSasPermission()
+ .setReadPermission(true)
+ .setWritePermission(true)
+ .setListPermission(true);
+
+ BlobServiceSasSignatureValues sasValues = new
BlobServiceSasSignatureValues(expiryTime, permission)
+ .setProtocol(SasProtocol.HTTPS_ONLY)
+ .setStartTime(OffsetDateTime.now());
+
+ String sasToken =
containerClient.generateUserDelegationSas(sasValues, userDelegationKey);
+ return Triple.of(obj.getAk(), obj.getSk(), sasToken);
+ } catch (Throwable e) {
+ LOG.warn("Failed get Azure sts token", e);
+ throw new DdlException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void deleteObjects(List<String> keys) throws DdlException {
+ checkDeleteKeys(keys);
+ initClient();
+
+ try {
+ BlobBatchClient blobBatchClient = new BlobBatchClientBuilder(
+ client.getServiceClient()).buildClient();
+ int maxDelete = 1000;
+ for (int i = 0; i < keys.size() / maxDelete + 1; i++) {
+ int cnt = 0;
+ BlobBatch batch = blobBatchClient.getBlobBatch();
+ for (int j = maxDelete * i; j < keys.size() && cnt <
maxDelete; j++) {
+ batch.deleteBlob(client.getBlobContainerName(),
keys.get(j));
+ cnt++;
+ }
+ Response<Void> resp =
blobBatchClient.submitBatchWithResponse(batch, true, null, Context.NONE);
+ if (resp.getStatusCode() != HttpStatus.SC_OK) {
+ throw new DdlException(
+ "Failed delete objects, bucket=" +
obj.getBucket());
+ }
+ }
+ } catch (BlobStorageException e) {
+ LOG.warn("Failed to delete objects for Azure", e);
+ throw new DdlException("Failed to delete objects for Azure, Error
message=" + e.getMessage());
+ }
+ }
+
+ @Override
+ public void close() {
+ client = null;
+ }
+
+ @Override
+ public String toString() {
+ return "AzureRemote{obj=" + obj + '}';
+ }
+
+ private ListObjectsResult listObjectsInner(String prefix, String
continuationToken) throws DdlException {
+ initClient();
+ try {
+ ListBlobsOptions options = new
ListBlobsOptions().setPrefix(prefix);
+ PagedIterable<BlobItem> pagedBlobs = client.listBlobs(options,
continuationToken, null);
+ PagedResponse<BlobItem> pagedResponse =
pagedBlobs.iterableByPage().iterator().next();
+ List<ObjectFile> objectFiles = new ArrayList<>();
+
+ for (BlobItem blobItem : pagedResponse.getElements()) {
+ objectFiles.add(new ObjectFile(blobItem.getName(),
getRelativePath(blobItem.getName()),
+ blobItem.getProperties().getETag(),
blobItem.getProperties().getContentLength()));
+ }
+ return new ListObjectsResult(objectFiles,
pagedResponse.getContinuationToken() == null,
+ pagedResponse.getContinuationToken());
+ } catch (BlobStorageException e) {
+ LOG.warn("Failed to list objects for Azure", e);
+ throw new DdlException("Failed to list objects for Azure, Error
message=" + e.getMessage());
+ }
+ }
+
+ private void initClient() {
+ if (client == null) {
+ BlobContainerClientBuilder builder = new
BlobContainerClientBuilder();
+ if (obj.getToken() != null) {
+ builder.credential(new SimpleTokenCredential(obj.getToken()));
+ } else {
+ builder.credential(new StorageSharedKeyCredential(obj.getAk(),
obj.getSk()));
+ }
+ String containerName = obj.getBucket();
+ String uri = String.format(URI_TEMPLATE, obj.getAk(),
+ containerName);
+ builder.endpoint(uri);
+ client = builder.buildClient();
+ }
+ }
+
+ // Custom implementation of TokenCredential
+ private static class SimpleTokenCredential implements TokenCredential {
+ private static final Logger LOG =
LogManager.getLogger(SimpleTokenCredential.class);
+ private final String token;
+
+ SimpleTokenCredential(String token) {
+ this.token = token;
+ }
+
+ @Override
+ public Mono<AccessToken> getToken(TokenRequestContext request) {
+ LOG.info("Getting token for scopes: {}", String.join(", ",
request.getScopes()));
+ // Assume the token is valid for 1 hour from the current time
+ return Mono.just(new AccessToken(token,
OffsetDateTime.now().plusSeconds(SESSION_EXPIRE_SECOND)));
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/BosRemote.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/BosRemote.java
index c55eb9031fa..b3da80e9bb6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/BosRemote.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/BosRemote.java
@@ -42,7 +42,8 @@ public class BosRemote extends DefaultRemote {
config.setCredentials(new DefaultBceCredentials(obj.getAk(),
obj.getSk()));
config.setEndpoint(obj.getEndpoint());
BosClient client = new BosClient(config);
- URL url = client.generatePresignedUrl(obj.getBucket(),
normalizePrefix(fileName), 3600, HttpMethodName.PUT);
+ URL url = client.generatePresignedUrl(obj.getBucket(),
normalizePrefix(fileName),
+ (int) SESSION_EXPIRE_SECOND, HttpMethodName.PUT);
LOG.info("Bos getPresignedUrl: {}", url);
return url.toString();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/CosRemote.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/CosRemote.java
index 7d1aa27b7d9..e541014e78e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/CosRemote.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/CosRemote.java
@@ -56,7 +56,7 @@ public class CosRemote extends DefaultRemote {
clientConfig.setRegion(new Region(obj.getRegion()));
clientConfig.setHttpProtocol(HttpProtocol.https);
COSClient cosClient = new COSClient(cred, clientConfig);
- Date expirationDate = new Date(System.currentTimeMillis() + 60 * 60 *
1000);
+ Date expirationDate = new Date(System.currentTimeMillis() +
SESSION_EXPIRE_SECOND);
URL url = cosClient.generatePresignedUrl(obj.getBucket(),
normalizePrefix(fileName), expirationDate, HttpMethodName.PUT,
new HashMap<String, String>(), new HashMap<String, String>());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/ObsRemote.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/ObsRemote.java
index 6f8e332f6f4..053066c11bc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/ObsRemote.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/ObsRemote.java
@@ -55,8 +55,8 @@ public class ObsRemote extends DefaultRemote {
String sk = obj.getSk();
ObsClient obsClient = new ObsClient(ak, sk, endPoint);
- long expireSeconds = 3600L;
- TemporarySignatureRequest request = new
TemporarySignatureRequest(HttpMethodEnum.PUT, expireSeconds);
+ TemporarySignatureRequest request = new TemporarySignatureRequest(
+ HttpMethodEnum.PUT, SESSION_EXPIRE_SECOND);
request.setBucketName(obj.getBucket());
request.setObjectKey(normalizePrefix(fileName));
request.setHeaders(new HashMap<String, String>());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/OssRemote.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/OssRemote.java
index 139d2bbd415..42e019c77ba 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/OssRemote.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/OssRemote.java
@@ -65,7 +65,7 @@ public class OssRemote extends DefaultRemote {
try {
GeneratePresignedUrlRequest request
= new GeneratePresignedUrlRequest(bucketName, objectName,
HttpMethod.PUT);
- Date expiration = new Date(new Date().getTime() + 3600 * 1000);
+ Date expiration = new Date(new Date().getTime() +
SESSION_EXPIRE_SECOND * 1000);
request.setExpiration(expiration);
URL signedUrl = ossClient.generatePresignedUrl(request);
return signedUrl.toString();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/RemoteBase.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/RemoteBase.java
index e146e52f534..b15c9fb35a8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/RemoteBase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/RemoteBase.java
@@ -116,6 +116,8 @@ public abstract class RemoteBase {
public ObjectInfo obj;
+ protected static long SESSION_EXPIRE_SECOND = 3600;
+
public RemoteBase(ObjectInfo obj) {
this.obj = obj;
}
@@ -149,6 +151,8 @@ public abstract class RemoteBase {
return new ObsRemote(obj);
case BOS:
return new BosRemote(obj);
+ case AZURE:
+ return new AzureRemote(obj);
default:
throw new Exception("current not support obj : " +
obj.toString());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/S3Remote.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/S3Remote.java
index 49d82ab5ae2..35e9dc7b39a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/S3Remote.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/S3Remote.java
@@ -54,7 +54,7 @@ public class S3Remote extends DefaultRemote {
.build();
PutObjectPresignRequest presignRequest =
PutObjectPresignRequest.builder()
- .signatureDuration(Duration.ofMinutes(60))
+
.signatureDuration(Duration.ofSeconds(SESSION_EXPIRE_SECOND))
.putObjectRequest(objectRequest)
.build();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]