This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new dbd98e07cb3 [enhance](Azure) Use s3Uri to specify the object's bucket
and key for azure in FE (#37308)
dbd98e07cb3 is described below
commit dbd98e07cb3ff37512bc0e20e7f421ff5cf8cfde
Author: AlexYue <[email protected]>
AuthorDate: Thu Jul 4 23:21:46 2024 +0800
[enhance](Azure) Use s3Uri to specify the object's bucket and key for azure
in FE (#37308)
Previously when using s3 load on azure blob storage, user should specify
the s3.bucket property. But actually we can get the bucket information
from the data infile uri.
---
.../org/apache/doris/fs/obj/AzureObjStorage.java | 154 ++++++++++++++++++---
.../apache/doris/fs/remote/AzureFileSystem.java | 74 +---------
2 files changed, 138 insertions(+), 90 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java
index 2d55f7dd477..358b66b44b2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java
@@ -20,7 +20,10 @@ package org.apache.doris.fs.obj;
import org.apache.doris.backup.Status;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.S3URI;
+import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.fs.remote.RemoteFile;
import com.azure.core.http.rest.PagedIterable;
import com.azure.core.http.rest.PagedResponse;
@@ -28,7 +31,8 @@ import com.azure.core.http.rest.Response;
import com.azure.core.util.Context;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
-import com.azure.storage.blob.BlobContainerClientBuilder;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.batch.BlobBatch;
import com.azure.storage.blob.batch.BlobBatchClient;
import com.azure.storage.blob.batch.BlobBatchClientBuilder;
@@ -45,23 +49,41 @@ import org.jetbrains.annotations.Nullable;
import java.io.File;
import java.io.InputStream;
+import java.nio.file.FileSystems;
+import java.nio.file.PathMatcher;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-public class AzureObjStorage implements ObjStorage<BlobContainerClient> {
+public class AzureObjStorage implements ObjStorage<BlobServiceClient> {
private static final Logger LOG =
LogManager.getLogger(AzureObjStorage.class);
+ private static final String URI_TEMPLATE =
"https://%s.blob.core.windows.net";
protected Map<String, String> properties;
- private BlobContainerClient client;
+ private BlobServiceClient client;
+ private boolean isUsePathStyle = false;
- private static final String URI_TEMPLATE =
"https://%s.blob.core.windows.net/%s";
+ private boolean forceParsingByStandardUri = false;
public AzureObjStorage(Map<String, String> properties) {
this.properties = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
setProperties(properties);
}
+ // To ensure compatibility with S3 usage, the path passed by the user
still starts with 'S3://${containerName}'.
+ // For Azure, we need to remove this part.
+ private static String removeUselessSchema(String remotePath) {
+ String prefix = "s3://";
+
+ if (remotePath.startsWith(prefix)) {
+ remotePath = remotePath.substring(prefix.length());
+ }
+ // Remove the useless container name
+ int firstSlashIndex = remotePath.indexOf('/');
+ return remotePath.substring(firstSlashIndex + 1);
+ }
+
public Map<String, String> getProperties() {
return properties;
}
@@ -73,16 +95,29 @@ public class AzureObjStorage implements
ObjStorage<BlobContainerClient> {
} catch (DdlException e) {
throw new IllegalArgumentException(e);
}
+ // Virtual hosted-style is recommended in the s3 protocol.
+ // The path-style has been abandoned, but for some unexplainable
reasons,
+ // the s3 client will determine whether the endpiont starts with `s3`
+ // when generating a virtual hosted-sytle request.
+ // If not, it will not be converted (
https://github.com/aws/aws-sdk-java-v2/pull/763),
+ // but the endpoints of many cloud service providers for object
storage do not start with s3,
+ // so they cannot be converted to virtual hosted-sytle.
+ // Some of them, such as aliyun's oss, only support virtual
hosted-style,
+ // and some of them(ceph) may only support
+ // path-style, so we need to do some additional conversion.
+ isUsePathStyle =
this.properties.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false")
+ .equalsIgnoreCase("true");
+ forceParsingByStandardUri =
this.properties.getOrDefault(PropertyConverter.FORCE_PARSING_BY_STANDARD_URI,
+ "false").equalsIgnoreCase("true");
}
@Override
- public BlobContainerClient getClient() throws UserException {
+ public BlobServiceClient getClient() throws UserException {
if (client == null) {
- String containerName = properties.get(S3Properties.BUCKET);
- String uri = String.format(URI_TEMPLATE,
properties.get(S3Properties.ACCESS_KEY), containerName);
+ String uri = String.format(URI_TEMPLATE,
properties.get(S3Properties.ACCESS_KEY));
StorageSharedKeyCredential cred = new
StorageSharedKeyCredential(properties.get(S3Properties.ACCESS_KEY),
properties.get(S3Properties.SECRET_KEY));
- BlobContainerClientBuilder builder = new
BlobContainerClientBuilder();
+ BlobServiceClientBuilder builder = new BlobServiceClientBuilder();
builder.credential(cred);
builder.endpoint(uri);
client = builder.buildClient();
@@ -97,8 +132,9 @@ public class AzureObjStorage implements
ObjStorage<BlobContainerClient> {
@Override
public Status headObject(String remotePath) {
- BlobClient blobClient = client.getBlobClient(remotePath);
try {
+ S3URI uri = S3URI.create(remotePath, isUsePathStyle,
forceParsingByStandardUri);
+ BlobClient blobClient =
getClient().getBlobContainerClient(uri.getBucket()).getBlobClient(uri.getKey());
BlobProperties properties = blobClient.getProperties();
LOG.info("head file {} success: {}", remotePath,
properties.toString());
return Status.OK;
@@ -110,13 +146,17 @@ public class AzureObjStorage implements
ObjStorage<BlobContainerClient> {
return new Status(Status.ErrCode.COMMON_ERROR, "headObject "
+ remotePath + " failed: " + e.getMessage());
}
+ } catch (UserException e) {
+ return new Status(Status.ErrCode.COMMON_ERROR, "headObject "
+ + remotePath + " failed: " + e.getMessage());
}
}
@Override
public Status getObject(String remoteFilePath, File localFile) {
try {
- BlobClient blobClient = client.getBlobClient(remoteFilePath);
+ S3URI uri = S3URI.create(remoteFilePath, isUsePathStyle,
forceParsingByStandardUri);
+ BlobClient blobClient =
getClient().getBlobContainerClient(uri.getBucket()).getBlobClient(uri.getKey());
BlobProperties properties =
blobClient.downloadToFile(localFile.getAbsolutePath());
LOG.info("get file " + remoteFilePath + " success: " +
properties.toString());
return Status.OK;
@@ -124,26 +164,34 @@ public class AzureObjStorage implements
ObjStorage<BlobContainerClient> {
return new Status(
Status.ErrCode.COMMON_ERROR,
"get file from azure error: " + e.getServiceMessage());
+ } catch (UserException e) {
+ return new Status(Status.ErrCode.COMMON_ERROR, "getObject "
+ + remoteFilePath + " failed: " + e.getMessage());
}
}
@Override
public Status putObject(String remotePath, @Nullable InputStream content,
long contentLength) {
try {
- BlobClient blobClient = client.getBlobClient(remotePath);
+ S3URI uri = S3URI.create(remotePath, isUsePathStyle,
forceParsingByStandardUri);
+ BlobClient blobClient =
getClient().getBlobContainerClient(uri.getBucket()).getBlobClient(uri.getKey());
blobClient.upload(content, contentLength);
return Status.OK;
} catch (BlobStorageException e) {
return new Status(
Status.ErrCode.COMMON_ERROR,
"Error occurred while copying the blob:: " +
e.getServiceMessage());
+ } catch (UserException e) {
+ return new Status(Status.ErrCode.COMMON_ERROR, "putObject "
+ + remotePath + " failed: " + e.getMessage());
}
}
@Override
public Status deleteObject(String remotePath) {
try {
- BlobClient blobClient = client.getBlobClient(remotePath);
+ S3URI uri = S3URI.create(remotePath, isUsePathStyle,
forceParsingByStandardUri);
+ BlobClient blobClient =
getClient().getBlobContainerClient(uri.getBucket()).getBlobClient(uri.getKey());
blobClient.delete();
LOG.info("delete file " + remotePath + " success");
return Status.OK;
@@ -151,12 +199,18 @@ public class AzureObjStorage implements
ObjStorage<BlobContainerClient> {
return new Status(
Status.ErrCode.COMMON_ERROR,
"get file from azure error: " + e.getServiceMessage());
+ } catch (UserException e) {
+ return new Status(Status.ErrCode.COMMON_ERROR, "deleteObject "
+ + remotePath + " failed: " + e.getMessage());
}
}
@Override
public Status deleteObjects(String remotePath) {
try {
+ S3URI uri = S3URI.create(remotePath, isUsePathStyle,
forceParsingByStandardUri);
+ BlobContainerClient blobClient =
getClient().getBlobContainerClient(uri.getBucket());
+ String containerUrl = blobClient.getBlobContainerUrl();
String continuationToken = "";
boolean isTruncated = false;
long totalObjects = 0;
@@ -165,11 +219,11 @@ public class AzureObjStorage implements
ObjStorage<BlobContainerClient> {
List<RemoteObject> objectList = objects.getObjectList();
if (!objectList.isEmpty()) {
BlobBatchClient blobBatchClient = new
BlobBatchClientBuilder(
- client.getServiceClient()).buildClient();
+ getClient()).buildClient();
BlobBatch blobBatch = blobBatchClient.getBlobBatch();
for (RemoteObject blob : objectList) {
- blobBatch.deleteBlob(client.getBlobContainerUrl(),
blob.getKey());
+ blobBatch.deleteBlob(containerUrl, blob.getKey());
}
Response<Void> resp =
blobBatchClient.submitBatchWithResponse(blobBatch, true, null, Context.NONE);
LOG.info("{} objects deleted for dir {} return http code
{}",
@@ -193,15 +247,22 @@ public class AzureObjStorage implements
ObjStorage<BlobContainerClient> {
@Override
public Status copyObject(String origFilePath, String destFilePath) {
try {
- BlobClient sourceBlobClient = client.getBlobClient(origFilePath);
- BlobClient destinationBlobClient =
client.getBlobClient(destFilePath);
+ S3URI origUri = S3URI.create(origFilePath, isUsePathStyle,
forceParsingByStandardUri);
+ S3URI destUri = S3URI.create(destFilePath, isUsePathStyle,
forceParsingByStandardUri);
+ BlobClient sourceBlobClient =
getClient().getBlobContainerClient(origUri.getBucket())
+ .getBlobClient(origUri.getKey());
+ BlobClient destinationBlobClient =
getClient().getBlobContainerClient(destUri.getBucket())
+ .getBlobClient(destUri.getKey());
destinationBlobClient.beginCopy(sourceBlobClient.getBlobUrl(),
null);
- System.out.println("Blob copied from " + origFilePath + " to " +
destFilePath);
+ LOG.info("Blob copied from " + origFilePath + " to " +
destFilePath);
return Status.OK;
} catch (BlobStorageException e) {
return new Status(
Status.ErrCode.COMMON_ERROR,
"Error occurred while copying the blob:: " +
e.getServiceMessage());
+ } catch (UserException e) {
+ return new Status(Status.ErrCode.COMMON_ERROR, "copyObject from "
+ + origFilePath + "to " + destFilePath + " failed: " +
e.getMessage());
}
}
@@ -209,7 +270,9 @@ public class AzureObjStorage implements
ObjStorage<BlobContainerClient> {
public RemoteObjects listObjects(String remotePath, String
continuationToken) throws DdlException {
try {
ListBlobsOptions options = new
ListBlobsOptions().setPrefix(remotePath);
- PagedIterable<BlobItem> pagedBlobs = client.listBlobs(options,
continuationToken, null);
+ S3URI uri = S3URI.create(remotePath, isUsePathStyle,
forceParsingByStandardUri);
+ PagedIterable<BlobItem> pagedBlobs =
getClient().getBlobContainerClient(uri.getBucket())
+ .listBlobs(options, continuationToken, null);
PagedResponse<BlobItem> pagedResponse =
pagedBlobs.iterableByPage().iterator().next();
List<RemoteObject> remoteObjects = new ArrayList<>();
@@ -222,6 +285,61 @@ public class AzureObjStorage implements
ObjStorage<BlobContainerClient> {
} catch (BlobStorageException e) {
LOG.warn(String.format("Failed to list objects for S3: %s",
remotePath), e);
throw new DdlException("Failed to list objects for S3, Error
message: " + e.getMessage(), e);
+ } catch (UserException e) {
+ LOG.warn(String.format("Failed to list objects for S3: %s",
remotePath), e);
+ throw new DdlException("Failed to list objects for S3, Error
message: " + e.getMessage(), e);
+ }
+ }
+
+ // Due to historical reasons, when the BE parses the object storage path.
+ // It assumes the path starts with 'S3://${containerName}'
+ // So here the path needs to be constructed in a format that BE can parse.
+ private String constructS3Path(String fileName, String bucket) throws
UserException {
+ LOG.info("the path is {}", String.format("s3://%s/%s", bucket,
fileName));
+ return String.format("s3://%s/%s", bucket, fileName);
+ }
+
+ public Status globList(String remotePath, List<RemoteFile> result, boolean
fileNameOnly) {
+ try {
+ S3URI uri = S3URI.create(remotePath, isUsePathStyle,
forceParsingByStandardUri);
+ String globPath = uri.getKey();
+ LOG.info("try to glob list for azure, remote path {}, orig {}",
globPath, remotePath);
+ BlobContainerClient client =
getClient().getBlobContainerClient(uri.getBucket());
+ java.nio.file.Path pathPattern = Paths.get(globPath);
+ LOG.info("path pattern {}", pathPattern.toString());
+ PathMatcher matcher =
FileSystems.getDefault().getPathMatcher("glob:" + pathPattern.toString());
+
+ ListBlobsOptions options = new
ListBlobsOptions().setPrefix(globPath);
+ String newContinuationToken = null;
+ do {
+ PagedIterable<BlobItem> pagedBlobs = client.listBlobs(options,
newContinuationToken, null);
+ PagedResponse<BlobItem> pagedResponse =
pagedBlobs.iterableByPage().iterator().next();
+
+ for (BlobItem blobItem : pagedResponse.getElements()) {
+ java.nio.file.Path blobPath =
Paths.get(blobItem.getName());
+
+ if (matcher.matches(blobPath)) {
+ RemoteFile remoteFile = new RemoteFile(
+ fileNameOnly ?
blobPath.getFileName().toString() : constructS3Path(blobPath.toString(),
+ uri.getBucket()),
+ !blobItem.isPrefix(),
+ blobItem.isPrefix() ? -1 :
blobItem.getProperties().getContentLength(),
+ blobItem.getProperties().getContentLength(),
+
blobItem.getProperties().getLastModified().getSecond());
+ result.add(remoteFile);
+ }
+ }
+ newContinuationToken = pagedResponse.getContinuationToken();
+ } while (newContinuationToken != null);
+
+ } catch (BlobStorageException e) {
+ LOG.warn("glob file " + remotePath + " failed because azure error:
" + e.getMessage());
+ return new Status(Status.ErrCode.COMMON_ERROR, "glob file " +
remotePath
+ + " failed because azure error: " + e.getMessage());
+ } catch (Exception e) {
+ LOG.warn("errors while glob file " + remotePath, e);
+ return new Status(Status.ErrCode.COMMON_ERROR, "errors while glob
file " + remotePath + e.getMessage());
}
+ return Status.OK;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java
index 39dd9c2fe42..5004cfd2f12 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java
@@ -23,20 +23,11 @@ import org.apache.doris.backup.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.fs.obj.AzureObjStorage;
-import com.azure.core.http.rest.PagedIterable;
-import com.azure.core.http.rest.PagedResponse;
-import com.azure.storage.blob.BlobContainerClient;
-import com.azure.storage.blob.models.BlobItem;
-import com.azure.storage.blob.models.BlobStorageException;
-import com.azure.storage.blob.models.ListBlobsOptions;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.FileSystem;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.nio.file.FileSystems;
-import java.nio.file.PathMatcher;
-import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
@@ -63,70 +54,9 @@ public class AzureFileSystem extends ObjFileSystem {
return null;
}
- // To ensure compatibility with S3 usage, the path passed by the user
still starts with 'S3://${containerName}'.
- // For Azure, we need to remove this part.
- private static String removeUselessSchema(String remotePath) {
- String prefix = "s3://";
-
- if (remotePath.startsWith(prefix)) {
- remotePath = remotePath.substring(prefix.length());
- }
- // Remove the useless container name
- int firstSlashIndex = remotePath.indexOf('/');
- return remotePath.substring(firstSlashIndex + 1);
- }
-
- // Due to historical reasons, when the BE parses the object storage path.
- // It assumes the path starts with 'S3://${containerName}'
- // So here the path needs to be constructed in a format that BE can parse.
- private String constructS3Path(String fileName) throws UserException {
- BlobContainerClient client = (BlobContainerClient)
getObjStorage().getClient();
- String bucket = client.getBlobContainerName();
- LOG.info("the path is {}", String.format("s3://%s/%s", bucket,
fileName));
- return String.format("s3://%s/%s", bucket, fileName);
- }
-
@Override
public Status globList(String remotePath, List<RemoteFile> result, boolean
fileNameOnly) {
- String copyPath = new String(remotePath);
- copyPath = removeUselessSchema(copyPath);
- LOG.info("try to glob list for azure, remote path {}, orig {}",
copyPath, remotePath);
- try {
- BlobContainerClient client = (BlobContainerClient)
getObjStorage().getClient();
- java.nio.file.Path pathPattern = Paths.get(copyPath);
- LOG.info("path pattern {}", pathPattern.toString());
- PathMatcher matcher =
FileSystems.getDefault().getPathMatcher("glob:" + pathPattern.toString());
-
- ListBlobsOptions options = new
ListBlobsOptions().setPrefix(copyPath);
- String newContinuationToken = null;
- do {
- PagedIterable<BlobItem> pagedBlobs = client.listBlobs(options,
newContinuationToken, null);
- PagedResponse<BlobItem> pagedResponse =
pagedBlobs.iterableByPage().iterator().next();
-
- for (BlobItem blobItem : pagedResponse.getElements()) {
- java.nio.file.Path blobPath =
Paths.get(blobItem.getName());
-
- if (matcher.matches(blobPath)) {
- RemoteFile remoteFile = new RemoteFile(
- fileNameOnly ?
blobPath.getFileName().toString() : constructS3Path(blobPath.toString()),
- !blobItem.isPrefix(),
- blobItem.isPrefix() ? -1 :
blobItem.getProperties().getContentLength(),
- blobItem.getProperties().getContentLength(),
-
blobItem.getProperties().getLastModified().getSecond());
- result.add(remoteFile);
- }
- }
- newContinuationToken = pagedResponse.getContinuationToken();
- } while (newContinuationToken != null);
-
- } catch (BlobStorageException e) {
- LOG.warn("glob file " + remotePath + " failed because azure error:
" + e.getMessage());
- return new Status(Status.ErrCode.COMMON_ERROR, "glob file " +
remotePath
- + " failed because azure error: " + e.getMessage());
- } catch (Exception e) {
- LOG.warn("errors while glob file " + remotePath, e);
- return new Status(Status.ErrCode.COMMON_ERROR, "errors while glob
file " + remotePath + e.getMessage());
- }
- return Status.OK;
+ AzureObjStorage azureObjStorage = (AzureObjStorage) getObjStorage();
+ return azureObjStorage.globList(remotePath, result, fileNameOnly);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]