This is an automated email from the ASF dual-hosted git repository. casion pushed a commit to branch release-1.8.0 in repository https://gitbox.apache.org/repos/asf/linkis.git
commit 59c29d7d48b4a3deb15c21fb0c4f07504affa100 Author: aiceflower <[email protected]> AuthorDate: Tue Sep 30 14:32:55 2025 +0800 fix azure compile error (#5264) * fix azure compile error * fix azure compile error * fix storage test error --------- Co-authored-by: aiceflower <[email protected]> --- linkis-commons/linkis-storage/pom.xml | 352 +++++------ .../factory/impl/BuildAzureBlobFileSystem.java | 58 +- .../storage/fs/impl/AzureBlobFileSystem.java | 694 +++++++++++---------- .../storage/utils/StorageConfigurationTest.scala | 167 ++--- 4 files changed, 650 insertions(+), 621 deletions(-) diff --git a/linkis-commons/linkis-storage/pom.xml b/linkis-commons/linkis-storage/pom.xml index 8715b97c7a..72ce14950c 100644 --- a/linkis-commons/linkis-storage/pom.xml +++ b/linkis-commons/linkis-storage/pom.xml @@ -1,176 +1,176 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.linkis</groupId> - <artifactId>linkis</artifactId> - <version>${revision}</version> - <relativePath>../../pom.xml</relativePath> - </parent> - <artifactId>linkis-storage</artifactId> - - <packaging>jar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.linkis</groupId> - <artifactId>linkis-common</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.linkis</groupId> - <artifactId>linkis-hadoop-common</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - </exclusion> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - <version>${protobuf.version}</version> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-core</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.poi</groupId> - <artifactId>poi</artifactId> - <version>${poi.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.poi</groupId> - <artifactId>poi-ooxml</artifactId> - <version>${poi.version}</version> - </dependency> - - <dependency> - <groupId>com.github.pjfanning</groupId> - <artifactId>excel-streaming-reader</artifactId> - <version>5.0.2</version> - </dependency> - - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-compress</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-aliyun</artifactId> - <version>3.3.4</version> - </dependency> - <dependency> - <groupId>com.aliyun.oss</groupId> - <artifactId>aliyun-sdk-oss</artifactId> - <version>3.16.0</version> - </dependency> - <dependency> - <groupId>org.jdom</groupId> - <artifactId>jdom2</artifactId> - </dependency> - - <dependency> - <groupId>com.amazonaws</groupId> - <artifactId>aws-java-sdk-s3</artifactId> - <version>1.12.261</version> - </dependency> - - <dependency> - <groupId>com.azure</groupId> - <artifactId>azure-storage-blob</artifactId> - </dependency> - <dependency> - <groupId>com.azure</groupId> - <artifactId>azure-storage-common</artifactId> - </dependency> - <dependency> - <groupId>com.azure</groupId> - <artifactId>azure-identity</artifactId> - </dependency> - <dependency> - <groupId>org.apache.parquet</groupId> - <artifactId>parquet-avro</artifactId> - <version>${parquet-avro.version}</version> - <scope>${storage.parquet.scope}</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - <version>${hadoop.version}</version> - <scope>${storage.parquet.scope}</scope> - <exclusions> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - <!-- for hadoop 3.3.3 --> - <exclusion> - <groupId>ch.qos.reload4j</groupId> - <artifactId>reload4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-reload4j</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.orc</groupId> - <artifactId>orc-core</artifactId> - <version>${orc-core.version}</version> - <classifier>nohive</classifier> - <scope>${storage.orc.scope}</scope> - <exclusions> - <exclusion> - <groupId>org.apache.hive</groupId> - <artifactId>hive-storage-api</artifactId> - </exclusion> - </exclusions> - </dependency> - - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>net.alchim31.maven</groupId> - <artifactId>scala-maven-plugin</artifactId> - </plugin> - </plugins> - </build> - -</project> +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.linkis</groupId> + <artifactId>linkis</artifactId> + <version>${revision}</version> + <relativePath>../../pom.xml</relativePath> + </parent> + <artifactId>linkis-storage</artifactId> + + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.linkis</groupId> + <artifactId>linkis-common</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.linkis</groupId> + <artifactId>linkis-hadoop-common</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>${protobuf.version}</version> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.poi</groupId> + <artifactId>poi</artifactId> + <version>${poi.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.poi</groupId> + <artifactId>poi-ooxml</artifactId> + <version>${poi.version}</version> + </dependency> + + <dependency> + <groupId>com.github.pjfanning</groupId> + <artifactId>excel-streaming-reader</artifactId> + <version>5.0.2</version> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-aliyun</artifactId> + <version>3.3.4</version> + </dependency> + <dependency> + <groupId>com.aliyun.oss</groupId> + <artifactId>aliyun-sdk-oss</artifactId> + <version>3.16.0</version> + </dependency> + <dependency> + <groupId>org.jdom</groupId> + <artifactId>jdom2</artifactId> + </dependency> + + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-s3</artifactId> + <version>1.12.261</version> + </dependency> + + <dependency> + <groupId>com.azure</groupId> + <artifactId>azure-storage-blob</artifactId> + </dependency> + <dependency> + <groupId>com.azure</groupId> + <artifactId>azure-storage-common</artifactId> + </dependency> + <dependency> + <groupId>com.azure</groupId> + <artifactId>azure-identity</artifactId> + </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-avro</artifactId> + <version>${parquet-avro.version}</version> + <scope>${storage.parquet.scope}</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + <scope>${storage.parquet.scope}</scope> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <!-- for hadoop 3.3.3 --> + <exclusion> + <groupId>ch.qos.reload4j</groupId> + <artifactId>reload4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-reload4j</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-core</artifactId> + <version>${orc-core.version}</version> + <classifier>nohive</classifier> + <scope>${storage.orc.scope}</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-storage-api</artifactId> + </exclusion> + </exclusions> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + +</project> diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java index 292bb952ed..8da6541882 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java @@ -14,46 +14,48 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.linkis.storage.factory.impl; import org.apache.linkis.common.io.Fs; import org.apache.linkis.storage.factory.BuildFactory; import org.apache.linkis.storage.fs.impl.AzureBlobFileSystem; import org.apache.linkis.storage.utils.StorageUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class BuildAzureBlobFileSystem implements BuildFactory { - private static final Logger LOG = LoggerFactory.getLogger(BuildAzureBlobFileSystem.class); - - @Override - public Fs getFs(String user, String proxyUser) { - AzureBlobFileSystem fs = new AzureBlobFileSystem(); - try { - fs.init(null); - } catch (IOException e) { - LOG.warn("get file system failed", e); - } - fs.setUser(user); - return fs; - } + private static final Logger LOG = LoggerFactory.getLogger(BuildAzureBlobFileSystem.class); - @Override - public Fs getFs(String user, String proxyUser, String label) { - AzureBlobFileSystem fs = new AzureBlobFileSystem(); - try { - fs.init(null); - } catch (IOException e) { - LOG.warn("get file system failed", e); - } - fs.setUser(user); - return fs; + @Override + public Fs getFs(String user, String proxyUser) { + AzureBlobFileSystem fs = new AzureBlobFileSystem(); + try { + fs.init(null); + } catch (IOException e) { + LOG.warn("get file system failed", e); } + fs.setUser(user); + return fs; + } - @Override - public String fsName() { - return StorageUtils.BLOB; + @Override + public Fs getFs(String user, String proxyUser, String label) { + AzureBlobFileSystem fs = new AzureBlobFileSystem(); + try { + fs.init(null); + } catch (IOException e) { + LOG.warn("get file system failed", e); } + fs.setUser(user); + return fs; + } + + @Override + public String fsName() { + return StorageUtils.BLOB(); + } } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java index 67475aecf2..35473a535f 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java @@ -17,15 +17,6 @@ package org.apache.linkis.storage.fs.impl; -import com.azure.core.util.polling.SyncPoller; -import com.azure.storage.blob.BlobClient; -import com.azure.storage.blob.BlobContainerClient; -import com.azure.storage.blob.BlobServiceClient; -import com.azure.storage.blob.BlobServiceClientBuilder; -import com.azure.storage.blob.models.BlobCopyInfo; -import com.azure.storage.blob.models.BlobStorageException; -import com.azure.storage.blob.specialized.BlobOutputStream; -import com.azure.storage.blob.specialized.BlockBlobClient; import org.apache.linkis.common.io.FsPath; import org.apache.linkis.storage.exception.StorageWarnException; import org.apache.linkis.storage.fs.FileSystem; @@ -40,362 +31,397 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import com.azure.core.util.polling.SyncPoller; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.models.BlobCopyInfo; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.specialized.BlobOutputStream; +import com.azure.storage.blob.specialized.BlockBlobClient; + import static org.apache.linkis.storage.errorcode.LinkisStorageErrorCodeSummary.TO_BE_UNKNOW; -import static org.apache.linkis.storage.utils.StorageUtils.BLOB_SCHEMA; public class AzureBlobFileSystem extends FileSystem { - private static final String SLASH = "/"; - - public static class PahtInfo { - private String schema = "http://"; // http - private String domain; // - private String container; // container name - private String blobName; // blob name - private String tail; - - public PahtInfo(String domain, String container, String blobName) { - this.domain = domain; - this.container = container; - this.blobName = blobName; - if (blobName != null) { - String[] names = blobName.split(SLASH, -1); - tail = names[names.length - 1]; - } - } - - public String toFullName() { - return schema + domain + SLASH + container + SLASH + blobName; - } - - public String getSchema() { - return schema; - } - - public String getDomain() { - return domain; - } - - public String getContainer() { - return container; - } - - public String getBlobName() { - return blobName; - } - - public String getTail() { - return tail; - } - - @Override - public String toString() { - return "PahtInfo{" + - "schema='" + schema + '\'' + - ", domain='" + domain + '\'' + - ", container='" + container + '\'' + - ", blobName='" + blobName + '\'' + - ", tail='" + tail + '\'' + - '}'; - } + private static final String SLASH = "/"; + + public static class PahtInfo { + private String schema = "http://"; // http + private String domain; // + private String container; // container name + private String blobName; // blob name + private String tail; + + public PahtInfo(String domain, String container, String blobName) { + this.domain = domain; + this.container = container; + this.blobName = blobName; + if (blobName != null) { + String[] names = blobName.split(SLASH, -1); + tail = names[names.length - 1]; + } } - /** - * manipulate Azure storage resources and Blob container 管理命名空间下的存储资源和Blob容器 - */ - private BlobServiceClient serviceClient; - - /** - * getBlobContainerClient - * - * @param containerName - * @return client which can manipulate Azure Storage containers and their blobs.<br> - * 操作一个容器和其blobs的客户端 - */ - private BlobContainerClient getBlobContainerClient(String containerName) { - return serviceClient.getBlobContainerClient(containerName); + public String toFullName() { + return schema + domain + SLASH + container + SLASH + blobName; } - private PahtInfo azureLocation(String path) { - return this.azureLocation(new FsPath(path)); + public String getSchema() { + return schema; } - /** - * @param dest - * @return domain name,container name,blob name - */ - private PahtInfo azureLocation(FsPath dest) { - //https://myaccount.blob.core.windows.net/mycontainer/dir/blobname - // returns myaccount.blob.core.windows.net/mycontainer/dir/blobname - String path = dest.getPath(); - // myaccount.blob.core.windows.net/mycontainer/dir/blobname - // will split to myaccount.blob.core.windows.net - // and mycontainer/dir/blobname - String[] paths = path.split(SLASH, 2); - if (paths.length < 2) { - throw new IllegalArgumentException("file path error,with out container:" + path); - } - // split to container and blob object, - // container/dir/blobname will split to container and dir/blobname - String[] names = paths[1].split(SLASH, 2); - if (names.length < 2) { - return new PahtInfo(paths[0], names[0], null); - } else { - return new PahtInfo(paths[0], names[0], names[1]); - } + public String getDomain() { + return domain; } - /** - * init serviceClient - * - * @param properties - * @throws IOException - */ - @Override - public void init(Map<String, String> properties) throws IOException { - - /** - * The storage account provides the top-level namespace for the Blob service. 每个账户提供了一个顶级的命名空间 - */ - String acctName = StorageConfiguration.AZURE_ACCT_NAME.getValue(properties); - String connectStr = StorageConfiguration.AZURE_ACCT_CONNECT_STR.getValue(properties); - // Azure SDK client builders accept the credential as a parameter - serviceClient = - new BlobServiceClientBuilder() - .endpoint(BLOB_SCHEMA + acctName + ".blob.core.windows.net/") - .connectionString(connectStr) - .buildClient(); + public String getContainer() { + return container; } - /** - * name of the fileSystem - * - * @return - */ - @Override - public String fsName() { - return StorageUtils.BLOB; + public String getBlobName() { + return blobName; } - @Override - public String rootUserName() { - return ""; + public String getTail() { + return tail; } - /** - * @param dest - * @return - * @throws IOException - */ @Override - public FsPath get(String dest) throws IOException { - FsPath path = new FsPath(dest); - if (exists(path)) { - return path; - } else { - throw new StorageWarnException( - TO_BE_UNKNOW.getErrorCode(), - "File or folder does not exist or file name is garbled(文件或者文件夹不存在或者文件名乱码)"); - } + public String toString() { + return "PahtInfo{" + + "schema='" + + schema + + '\'' + + ", domain='" + + domain + + '\'' + + ", container='" + + container + + '\'' + + ", blobName='" + + blobName + + '\'' + + ", tail='" + + tail + + '\'' + + '}'; } - - /** - * Opens a blob input stream to download the blob. - * - * @param dest - * @return - * @throws BlobStorageException – If a storage service error occurred. - */ - @Override - public InputStream read(FsPath dest) { - PahtInfo result = azureLocation(dest); - BlobClient blobclient = - getBlobContainerClient(result.getContainer()).getBlobClient(result.getBlobName()); - return blobclient.openInputStream(); + } + + /** manipulate Azure storage resources and Blob container 管理命名空间下的存储资源和Blob容器 */ + private BlobServiceClient serviceClient; + + /** + * getBlobContainerClient + * + * @param containerName + * @return client which can manipulate Azure Storage containers and their blobs.<br> + * 操作一个容器和其blobs的客户端 + */ + private BlobContainerClient getBlobContainerClient(String containerName) { + return serviceClient.getBlobContainerClient(containerName); + } + + private PahtInfo azureLocation(String path) { + return this.azureLocation(new FsPath(path)); + } + + /** + * @param dest + * @return domain name,container name,blob name + */ + private PahtInfo azureLocation(FsPath dest) { + // https://myaccount.blob.core.windows.net/mycontainer/dir/blobname + // returns myaccount.blob.core.windows.net/mycontainer/dir/blobname + String path = dest.getPath(); + // myaccount.blob.core.windows.net/mycontainer/dir/blobname + // will split to myaccount.blob.core.windows.net + // and mycontainer/dir/blobname + String[] paths = path.split(SLASH, 2); + if (paths.length < 2) { + throw new IllegalArgumentException("file path error,with out container:" + path); } - - /** - * @param dest - * @param overwrite - * @return - * @throws BlobStorageException – If a storage service error occurred. - * @see BlockBlobClient #getBlobOutputStream - */ - @Override - public OutputStream write(FsPath dest, boolean overwrite) { - - PahtInfo result = azureLocation(dest); - BlobClient blobclient = - getBlobContainerClient(result.getContainer()).getBlobClient(result.getBlobName()); - return blobclient.getBlockBlobClient().getBlobOutputStream(overwrite); + // split to container and blob object, + // container/dir/blobname will split to container and dir/blobname + String[] names = paths[1].split(SLASH, 2); + if (names.length < 2) { + return new PahtInfo(paths[0], names[0], null); + } else { + return new PahtInfo(paths[0], names[0], names[1]); } + } - /** - * create a blob<br> - * 创建一个对象("文件") - * - * @param dest - * @return - * @throws IOException - */ - @Override - public boolean create(String dest) throws IOException { - FsPath path = new FsPath(dest); - if (exists(path)) { - return false; - } - PahtInfo names = this.azureLocation(dest); - // TODO 如果是路径的话后面补一个文件. - if (!names.getTail().contains(".")) { - String tmp = names.toFullName() + SLASH + "_tmp.txt"; - names = this.azureLocation(tmp); - } - BlobContainerClient client = serviceClient.createBlobContainerIfNotExists(names.getContainer()); - try (BlobOutputStream bos = - client.getBlobClient(names.getBlobName()).getBlockBlobClient().getBlobOutputStream()) { - bos.write(1); - bos.flush(); - } - - return true; - } + /** + * init serviceClient + * + * @param properties + * @throws IOException + */ + @Override + public void init(Map<String, String> properties) throws IOException { /** - * Flat listing 5000 results at a time,without deleted.<br> - * 扁平化展示未删除的blob对象,最多5000条 TODO 分页接口,迭代器接口? - * - * @param path - * @return - * @throws IOException + * The storage account provides the top-level namespace for the Blob service. 每个账户提供了一个顶级的命名空间 */ - @Override - public List<FsPath> list(FsPath path) throws IOException { - final PahtInfo result = azureLocation(path); - return getBlobContainerClient(result.getContainer()).listBlobs().stream() - // Azure不会返回已删除对象 - .filter(item -> !item.isDeleted()) - .map(item -> { - FsPath tmp = new FsPath(result.toFullName() + SLASH + item.getName()); - // TODO 根据观察使用contentType来区别"对象"和"路径",但文档中没有具体的说明 - if (item.getProperties().getContentType() == null) { - tmp.setIsdir(true); - } - return tmp; - }) - .collect(Collectors.toList()); + String acctName = StorageConfiguration.AZURE_ACCT_NAME().getValue(properties); + String connectStr = StorageConfiguration.AZURE_ACCT_CONNECT_STR().getValue(properties); + // Azure SDK client builders accept the credential as a parameter + serviceClient = + new BlobServiceClientBuilder() + .endpoint(StorageUtils.BLOB_SCHEMA() + acctName + ".blob.core.windows.net/") + .connectionString(connectStr) + .buildClient(); + } + + /** + * name of the fileSystem + * + * @return + */ + @Override + public String fsName() { + return StorageUtils.BLOB(); + } + + @Override + public String rootUserName() { + return ""; + } + + /** + * @param dest + * @return + * @throws IOException + */ + @Override + public FsPath get(String dest) throws IOException { + FsPath path = new FsPath(dest); + if (exists(path)) { + return path; + } else { + throw new StorageWarnException( + TO_BE_UNKNOW.getErrorCode(), + "File or folder does not exist or file name is garbled(文件或者文件夹不存在或者文件名乱码)"); } - - @Override - public boolean canRead(FsPath dest) throws IOException { - if (this.exists(dest)) { - return true; - } else { - return false; - } - } - - @Override - public boolean canWrite(FsPath dest) throws IOException { - if (this.exists(dest)) { - return true; - } else { - return false; - } - } - - @Override - public boolean exists(FsPath dest) throws IOException { - PahtInfo file = this.azureLocation(dest); - return getBlobContainerClient(file.getContainer()).getBlobClient(file.getBlobName()).exists(); - } - - @Override - public boolean delete(FsPath dest) throws IOException { - PahtInfo file = this.azureLocation(dest); - return getBlobContainerClient(file.getContainer()).getBlobClient(file.getBlobName()).deleteIfExists(); - } - - @Override - public boolean copy(String origin, String dest) throws IOException { - PahtInfo oriNames = this.azureLocation(origin); - PahtInfo destNames = this.azureLocation(dest); - - BlobClient oriClient = - getBlobContainerClient(oriNames.getContainer()).getBlobClient(oriNames.getBlobName()); - BlockBlobClient destClient = - getBlobContainerClient(destNames.getContainer()) - .getBlobClient(destNames.getBlobName()) - .getBlockBlobClient(); - SyncPoller<BlobCopyInfo, Void> poller = destClient.beginCopy(oriClient.getBlobUrl(), Duration.ofSeconds(2)); - poller.waitForCompletion(); - return true; - } - - @Override - public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException { - // 没有事务性保证 - this.copy(oldDest.getPath(), newDest.getPath()); - this.delete(oldDest); - return true; - } - - @Override - public boolean mkdir(FsPath dest) throws IOException { - return this.create(dest.getPath()); - } - - @Override - public boolean mkdirs(FsPath dest) throws IOException { - return this.mkdir(dest); - } - - // 下面这些方法可能都无法支持 - @Override - public String listRoot() throws IOException { - return ""; - } - - @Override - public long getTotalSpace(FsPath dest) throws IOException { - return 0; - } - - @Override - public long getFreeSpace(FsPath dest) throws IOException { - return 0; - } - - @Override - public long getUsableSpace(FsPath dest) throws IOException { - return 0; - } - - @Override - public boolean canExecute(FsPath dest) throws IOException { - return false; - } - - @Override - public boolean setOwner(FsPath dest, String user, String group) throws IOException { - return false; + } + + /** + * Opens a blob input stream to download the blob. + * + * @param dest + * @return + * @throws BlobStorageException – If a storage service error occurred. + */ + @Override + public InputStream read(FsPath dest) { + PahtInfo result = azureLocation(dest); + BlobClient blobclient = + getBlobContainerClient(result.getContainer()).getBlobClient(result.getBlobName()); + return blobclient.openInputStream(); + } + + /** + * @param dest + * @param overwrite + * @return + * @throws BlobStorageException – If a storage service error occurred. + * @see BlockBlobClient #getBlobOutputStream + */ + @Override + public OutputStream write(FsPath dest, boolean overwrite) { + + PahtInfo result = azureLocation(dest); + BlobClient blobclient = + getBlobContainerClient(result.getContainer()).getBlobClient(result.getBlobName()); + return blobclient.getBlockBlobClient().getBlobOutputStream(overwrite); + } + + /** + * create a blob<br> + * 创建一个对象("文件") + * + * @param dest + * @return + * @throws IOException + */ + @Override + public boolean create(String dest) throws IOException { + FsPath path = new FsPath(dest); + if (exists(path)) { + return false; } - - @Override - public boolean setOwner(FsPath dest, String user) throws IOException { - return false; + PahtInfo names = this.azureLocation(dest); + // TODO 如果是路径的话后面补一个文件. + if (!names.getTail().contains(".")) { + String tmp = names.toFullName() + SLASH + "_tmp.txt"; + names = this.azureLocation(tmp); } - - @Override - public boolean setGroup(FsPath dest, String group) throws IOException { - return false; + BlobContainerClient client = serviceClient.createBlobContainerIfNotExists(names.getContainer()); + try (BlobOutputStream bos = + client.getBlobClient(names.getBlobName()).getBlockBlobClient().getBlobOutputStream()) { + bos.write(1); + bos.flush(); } - @Override - public boolean setPermission(FsPath dest, String permission) throws IOException { - return false; + return true; + } + + /** + * Flat listing 5000 results at a time,without deleted.<br> + * 扁平化展示未删除的blob对象,最多5000条 TODO 分页接口,迭代器接口? + * + * @param path + * @return + * @throws IOException + */ + @Override + public List<FsPath> list(FsPath path) throws IOException { + final PahtInfo result = azureLocation(path); + return getBlobContainerClient(result.getContainer()).listBlobs().stream() + // Azure不会返回已删除对象 + .filter(item -> !item.isDeleted()) + .map( + item -> { + FsPath tmp = new FsPath(result.toFullName() + SLASH + item.getName()); + // TODO 根据观察使用contentType来区别"对象"和"路径",但文档中没有具体的说明 + if (item.getProperties().getContentType() == null) { + tmp.setIsdir(true); + } + return tmp; + }) + .collect(Collectors.toList()); + } + + @Override + public boolean canRead(FsPath dest) throws IOException { + if (this.exists(dest)) { + return true; + } else { + return false; } - - @Override - public void close() throws IOException { + } + + @Override + public boolean canRead(FsPath dest, String user) throws IOException { + return false; + } + + @Override + public boolean canWrite(FsPath dest) throws IOException { + if (this.exists(dest)) { + return true; + } else { + return false; } + } + + @Override + public boolean exists(FsPath dest) throws IOException { + PahtInfo file = this.azureLocation(dest); + return getBlobContainerClient(file.getContainer()).getBlobClient(file.getBlobName()).exists(); + } + + @Override + public boolean delete(FsPath dest) throws IOException { + PahtInfo file = this.azureLocation(dest); + return getBlobContainerClient(file.getContainer()) + .getBlobClient(file.getBlobName()) + .deleteIfExists(); + } + + @Override + public boolean copy(String origin, String dest) throws IOException { + PahtInfo oriNames = this.azureLocation(origin); + PahtInfo destNames = this.azureLocation(dest); + + BlobClient oriClient = + getBlobContainerClient(oriNames.getContainer()).getBlobClient(oriNames.getBlobName()); + BlockBlobClient destClient = + getBlobContainerClient(destNames.getContainer()) + .getBlobClient(destNames.getBlobName()) + .getBlockBlobClient(); + SyncPoller<BlobCopyInfo, Void> poller = + destClient.beginCopy(oriClient.getBlobUrl(), Duration.ofSeconds(2)); + poller.waitForCompletion(); + return true; + } + + @Override + public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException { + // 没有事务性保证 + this.copy(oldDest.getPath(), newDest.getPath()); + this.delete(oldDest); + return true; + } + + @Override + public boolean mkdir(FsPath dest) throws IOException { + return this.create(dest.getPath()); + } + + @Override + public boolean mkdirs(FsPath dest) throws IOException { + return this.mkdir(dest); + } + + // 下面这些方法可能都无法支持 + @Override + public String listRoot() throws IOException { + return ""; + } + + @Override + public long getTotalSpace(FsPath dest) throws IOException { + return 0; + } + + @Override + public long getFreeSpace(FsPath dest) throws IOException { + return 0; + } + + @Override + public long getUsableSpace(FsPath dest) throws IOException { + return 0; + } + + @Override + public long getLength(FsPath dest) throws IOException { + return 0; + } + + @Override + public String checkSum(FsPath dest) throws IOException { + return null; + } + + @Override + public boolean canExecute(FsPath dest) throws IOException { + return false; + } + + @Override + public boolean setOwner(FsPath dest, String user, String group) throws IOException { + return false; + } + + @Override + public boolean setOwner(FsPath dest, String user) throws IOException { + return false; + } + + @Override + public boolean setGroup(FsPath dest, String group) throws IOException { + return false; + } + + @Override + public boolean setPermission(FsPath dest, String permission) throws IOException { + return false; + } + + @Override + public void close() throws IOException {} } diff --git a/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala b/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala index a821038005..e5adef9124 100644 --- a/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala +++ b/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala @@ -1,83 +1,84 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.storage.utils - -import org.junit.jupiter.api.{Assertions, DisplayName, Test} - -class StorageConfigurationTest { - - @Test - @DisplayName("constTest") - def constTest(): Unit = { - - val storagerootuser = StorageConfiguration.STORAGE_ROOT_USER.getValue - val hdfsrootuser = StorageConfiguration.HDFS_ROOT_USER.getValue - val localrootuser = StorageConfiguration.LOCAL_ROOT_USER.getValue - val storageusergroup = StorageConfiguration.STORAGE_USER_GROUP.getValue - val storagersfiletype = StorageConfiguration.STORAGE_RS_FILE_TYPE.getValue - val storagersfilesuffix = StorageConfiguration.STORAGE_RS_FILE_SUFFIX.getValue - val types = StorageConfiguration.ResultTypes - val storageresultsetpackage = StorageConfiguration.STORAGE_RESULT_SET_PACKAGE.getValue - val storageresultsetclasses = StorageConfiguration.STORAGE_RESULT_SET_CLASSES.getValue - val storagebuildfsclasses = StorageConfiguration.STORAGE_BUILD_FS_CLASSES.getValue - val issharenode = StorageConfiguration.IS_SHARE_NODE.getValue - val enableioproxy = StorageConfiguration.ENABLE_IO_PROXY.getValue - val ioUser = StorageConfiguration.IO_USER.getValue - val iofsexpiretime = StorageConfiguration.IO_FS_EXPIRE_TIME.getValue - val iodefaultcreator = StorageConfiguration.IO_DEFAULT_CREATOR.getValue - val iofsreinit = StorageConfiguration.IO_FS_RE_INIT.getValue - val ioinitretrylimit = StorageConfiguration.IO_INIT_RETRY_LIMIT.getValue - val storagehdfsgroup = StorageConfiguration.STORAGE_HDFS_GROUP.getValue - val doublefractionlen = StorageConfiguration.DOUBLE_FRACTION_LEN.getValue - val hdfspathprefixcheckon = StorageConfiguration.HDFS_PATH_PREFIX_CHECK_ON.getValue - val hdfspathprefixremove = StorageConfiguration.HDFS_PATH_PREFIX_REMOVE.getValue - val fschecksumdisbale = StorageConfiguration.FS_CHECKSUM_DISBALE - - Assertions.assertEquals("hadoop", storagerootuser) - Assertions.assertEquals("hadoop", hdfsrootuser) - Assertions.assertEquals("root", localrootuser) - Assertions.assertEquals("bdap", storageusergroup) - Assertions.assertEquals("utf-8", storagersfiletype) - Assertions.assertEquals(".dolphin", storagersfilesuffix) - Assertions.assertTrue(types.size > 0) - Assertions.assertEquals("org.apache.linkis.storage.resultset", storageresultsetpackage) - Assertions.assertEquals( - "txt.TextResultSet,table.TableResultSet,io.IOResultSet,html.HtmlResultSet,picture.PictureResultSet", - storageresultsetclasses - ) - Assertions.assertEquals( - "org.apache.linkis.storage.factory.impl.BuildHDFSFileSystem,org.apache.linkis.storage.factory.impl.BuildLocalFileSystem," + - "org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem", - storagebuildfsclasses - ) - Assertions.assertTrue(issharenode) - Assertions.assertFalse(enableioproxy) - Assertions.assertEquals("root", ioUser) - Assertions.assertTrue(600000 == iofsexpiretime) - Assertions.assertEquals("IDE", iodefaultcreator) - Assertions.assertEquals("re-init", iofsreinit) - Assertions.assertTrue(10 == ioinitretrylimit) - Assertions.assertEquals("hadoop", storagehdfsgroup) - Assertions.assertTrue(30 == doublefractionlen) - Assertions.assertTrue(hdfspathprefixcheckon) - Assertions.assertTrue(hdfspathprefixremove) - Assertions.assertFalse(fschecksumdisbale) - - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.utils + +import org.junit.jupiter.api.{Assertions, DisplayName, Test} + +class StorageConfigurationTest { + + @Test + @DisplayName("constTest") + def constTest(): Unit = { + + val storagerootuser = StorageConfiguration.STORAGE_ROOT_USER.getValue + val hdfsrootuser = StorageConfiguration.HDFS_ROOT_USER.getValue + val localrootuser = StorageConfiguration.LOCAL_ROOT_USER.getValue + val storageusergroup = StorageConfiguration.STORAGE_USER_GROUP.getValue + val storagersfiletype = StorageConfiguration.STORAGE_RS_FILE_TYPE.getValue + val storagersfilesuffix = StorageConfiguration.STORAGE_RS_FILE_SUFFIX.getValue + val types = StorageConfiguration.ResultTypes + val storageresultsetpackage = StorageConfiguration.STORAGE_RESULT_SET_PACKAGE.getValue + val storageresultsetclasses = StorageConfiguration.STORAGE_RESULT_SET_CLASSES.getValue + val storagebuildfsclasses = StorageConfiguration.STORAGE_BUILD_FS_CLASSES.getValue + val issharenode = StorageConfiguration.IS_SHARE_NODE.getValue + val enableioproxy = StorageConfiguration.ENABLE_IO_PROXY.getValue + val ioUser = StorageConfiguration.IO_USER.getValue + val iofsexpiretime = StorageConfiguration.IO_FS_EXPIRE_TIME.getValue + val iodefaultcreator = StorageConfiguration.IO_DEFAULT_CREATOR.getValue + val iofsreinit = StorageConfiguration.IO_FS_RE_INIT.getValue + val ioinitretrylimit = StorageConfiguration.IO_INIT_RETRY_LIMIT.getValue + val storagehdfsgroup = StorageConfiguration.STORAGE_HDFS_GROUP.getValue + val doublefractionlen = StorageConfiguration.DOUBLE_FRACTION_LEN.getValue + val hdfspathprefixcheckon = StorageConfiguration.HDFS_PATH_PREFIX_CHECK_ON.getValue + val hdfspathprefixremove = StorageConfiguration.HDFS_PATH_PREFIX_REMOVE.getValue + val fschecksumdisbale = StorageConfiguration.FS_CHECKSUM_DISBALE + + Assertions.assertEquals("hadoop", storagerootuser) + Assertions.assertEquals("hadoop", hdfsrootuser) + Assertions.assertEquals("root", localrootuser) + Assertions.assertEquals("bdap", storageusergroup) + Assertions.assertEquals("utf-8", storagersfiletype) + Assertions.assertEquals(".dolphin", storagersfilesuffix) + Assertions.assertTrue(types.size > 0) + Assertions.assertEquals("org.apache.linkis.storage.resultset", storageresultsetpackage) + Assertions.assertEquals( + "txt.TextResultSet,table.TableResultSet,io.IOResultSet,html.HtmlResultSet,picture.PictureResultSet", + storageresultsetclasses + ) + Assertions.assertEquals( + "org.apache.linkis.storage.factory.impl.BuildHDFSFileSystem,org.apache.linkis.storage.factory.impl.BuildLocalFileSystem," + + "org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem," + + "org.apache.linkis.storage.factory.impl.BuildAzureBlobFileSystem", + storagebuildfsclasses + ) + Assertions.assertTrue(issharenode) + Assertions.assertFalse(enableioproxy) + Assertions.assertEquals("root", ioUser) + Assertions.assertTrue(600000 == iofsexpiretime) + Assertions.assertEquals("IDE", iodefaultcreator) + Assertions.assertEquals("re-init", iofsreinit) + Assertions.assertTrue(10 == ioinitretrylimit) + Assertions.assertEquals("hadoop", storagehdfsgroup) + Assertions.assertTrue(30 == doublefractionlen) + Assertions.assertTrue(hdfspathprefixcheckon) + Assertions.assertTrue(hdfspathprefixremove) + Assertions.assertFalse(fschecksumdisbale) + + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
