This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.8.0
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.8.0 by this push:
new 59c29d7d48 fix azure compile error (#5264)
59c29d7d48 is described below
commit 59c29d7d48b4a3deb15c21fb0c4f07504affa100
Author: aiceflower <[email protected]>
AuthorDate: Tue Sep 30 14:32:55 2025 +0800
fix azure compile error (#5264)
* fix azure compile error
* fix azure compile error
* fix storage test error
---------
Co-authored-by: aiceflower <[email protected]>
---
linkis-commons/linkis-storage/pom.xml | 352 +++++------
.../factory/impl/BuildAzureBlobFileSystem.java | 58 +-
.../storage/fs/impl/AzureBlobFileSystem.java | 694 +++++++++++----------
.../storage/utils/StorageConfigurationTest.scala | 167 ++---
4 files changed, 650 insertions(+), 621 deletions(-)
diff --git a/linkis-commons/linkis-storage/pom.xml
b/linkis-commons/linkis-storage/pom.xml
index 8715b97c7a..72ce14950c 100644
--- a/linkis-commons/linkis-storage/pom.xml
+++ b/linkis-commons/linkis-storage/pom.xml
@@ -1,176 +1,176 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.linkis</groupId>
- <artifactId>linkis</artifactId>
- <version>${revision}</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
- <artifactId>linkis-storage</artifactId>
-
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.linkis</groupId>
- <artifactId>linkis-common</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.linkis</groupId>
- <artifactId>linkis-hadoop-common</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </exclusion>
- <exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <version>${protobuf.version}</version>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-core</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.poi</groupId>
- <artifactId>poi</artifactId>
- <version>${poi.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.poi</groupId>
- <artifactId>poi-ooxml</artifactId>
- <version>${poi.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.github.pjfanning</groupId>
- <artifactId>excel-streaming-reader</artifactId>
- <version>5.0.2</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-compress</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-aliyun</artifactId>
- <version>3.3.4</version>
- </dependency>
- <dependency>
- <groupId>com.aliyun.oss</groupId>
- <artifactId>aliyun-sdk-oss</artifactId>
- <version>3.16.0</version>
- </dependency>
- <dependency>
- <groupId>org.jdom</groupId>
- <artifactId>jdom2</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.amazonaws</groupId>
- <artifactId>aws-java-sdk-s3</artifactId>
- <version>1.12.261</version>
- </dependency>
-
- <dependency>
- <groupId>com.azure</groupId>
- <artifactId>azure-storage-blob</artifactId>
- </dependency>
- <dependency>
- <groupId>com.azure</groupId>
- <artifactId>azure-storage-common</artifactId>
- </dependency>
- <dependency>
- <groupId>com.azure</groupId>
- <artifactId>azure-identity</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.parquet</groupId>
- <artifactId>parquet-avro</artifactId>
- <version>${parquet-avro.version}</version>
- <scope>${storage.parquet.scope}</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <version>${hadoop.version}</version>
- <scope>${storage.parquet.scope}</scope>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <!-- for hadoop 3.3.3 -->
- <exclusion>
- <groupId>ch.qos.reload4j</groupId>
- <artifactId>reload4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-reload4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.orc</groupId>
- <artifactId>orc-core</artifactId>
- <version>${orc-core.version}</version>
- <classifier>nohive</classifier>
- <scope>${storage.orc.scope}</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-storage-api</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
-
-</project>
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.linkis</groupId>
+ <artifactId>linkis</artifactId>
+ <version>${revision}</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <artifactId>linkis-storage</artifactId>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.linkis</groupId>
+ <artifactId>linkis-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.linkis</groupId>
+ <artifactId>linkis-hadoop-common</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.poi</groupId>
+ <artifactId>poi</artifactId>
+ <version>${poi.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.poi</groupId>
+ <artifactId>poi-ooxml</artifactId>
+ <version>${poi.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.github.pjfanning</groupId>
+ <artifactId>excel-streaming-reader</artifactId>
+ <version>5.0.2</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aliyun</artifactId>
+ <version>3.3.4</version>
+ </dependency>
+ <dependency>
+ <groupId>com.aliyun.oss</groupId>
+ <artifactId>aliyun-sdk-oss</artifactId>
+ <version>3.16.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jdom</groupId>
+ <artifactId>jdom2</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-s3</artifactId>
+ <version>1.12.261</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-storage-blob</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-storage-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-identity</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-avro</artifactId>
+ <version>${parquet-avro.version}</version>
+ <scope>${storage.parquet.scope}</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>${storage.parquet.scope}</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <!-- for hadoop 3.3.3 -->
+ <exclusion>
+ <groupId>ch.qos.reload4j</groupId>
+ <artifactId>reload4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-reload4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ <version>${orc-core.version}</version>
+ <classifier>nohive</classifier>
+ <scope>${storage.orc.scope}</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-storage-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java
index 292bb952ed..8da6541882 100644
---
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java
+++
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java
@@ -14,46 +14,48 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.linkis.storage.factory.impl;
import org.apache.linkis.common.io.Fs;
import org.apache.linkis.storage.factory.BuildFactory;
import org.apache.linkis.storage.fs.impl.AzureBlobFileSystem;
import org.apache.linkis.storage.utils.StorageUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class BuildAzureBlobFileSystem implements BuildFactory {
- private static final Logger LOG =
LoggerFactory.getLogger(BuildAzureBlobFileSystem.class);
-
- @Override
- public Fs getFs(String user, String proxyUser) {
- AzureBlobFileSystem fs = new AzureBlobFileSystem();
- try {
- fs.init(null);
- } catch (IOException e) {
- LOG.warn("get file system failed", e);
- }
- fs.setUser(user);
- return fs;
- }
+ private static final Logger LOG =
LoggerFactory.getLogger(BuildAzureBlobFileSystem.class);
- @Override
- public Fs getFs(String user, String proxyUser, String label) {
- AzureBlobFileSystem fs = new AzureBlobFileSystem();
- try {
- fs.init(null);
- } catch (IOException e) {
- LOG.warn("get file system failed", e);
- }
- fs.setUser(user);
- return fs;
+ @Override
+ public Fs getFs(String user, String proxyUser) {
+ AzureBlobFileSystem fs = new AzureBlobFileSystem();
+ try {
+ fs.init(null);
+ } catch (IOException e) {
+ LOG.warn("get file system failed", e);
}
+ fs.setUser(user);
+ return fs;
+ }
- @Override
- public String fsName() {
- return StorageUtils.BLOB;
+ @Override
+ public Fs getFs(String user, String proxyUser, String label) {
+ AzureBlobFileSystem fs = new AzureBlobFileSystem();
+ try {
+ fs.init(null);
+ } catch (IOException e) {
+ LOG.warn("get file system failed", e);
}
+ fs.setUser(user);
+ return fs;
+ }
+
+ @Override
+ public String fsName() {
+ return StorageUtils.BLOB();
+ }
}
diff --git
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java
index 67475aecf2..35473a535f 100644
---
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java
+++
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java
@@ -17,15 +17,6 @@
package org.apache.linkis.storage.fs.impl;
-import com.azure.core.util.polling.SyncPoller;
-import com.azure.storage.blob.BlobClient;
-import com.azure.storage.blob.BlobContainerClient;
-import com.azure.storage.blob.BlobServiceClient;
-import com.azure.storage.blob.BlobServiceClientBuilder;
-import com.azure.storage.blob.models.BlobCopyInfo;
-import com.azure.storage.blob.models.BlobStorageException;
-import com.azure.storage.blob.specialized.BlobOutputStream;
-import com.azure.storage.blob.specialized.BlockBlobClient;
import org.apache.linkis.common.io.FsPath;
import org.apache.linkis.storage.exception.StorageWarnException;
import org.apache.linkis.storage.fs.FileSystem;
@@ -40,362 +31,397 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import com.azure.core.util.polling.SyncPoller;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobCopyInfo;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.specialized.BlobOutputStream;
+import com.azure.storage.blob.specialized.BlockBlobClient;
+
import static
org.apache.linkis.storage.errorcode.LinkisStorageErrorCodeSummary.TO_BE_UNKNOW;
-import static org.apache.linkis.storage.utils.StorageUtils.BLOB_SCHEMA;
public class AzureBlobFileSystem extends FileSystem {
- private static final String SLASH = "/";
-
- public static class PahtInfo {
- private String schema = "http://"; // http
- private String domain; //
- private String container; // container name
- private String blobName; // blob name
- private String tail;
-
- public PahtInfo(String domain, String container, String blobName) {
- this.domain = domain;
- this.container = container;
- this.blobName = blobName;
- if (blobName != null) {
- String[] names = blobName.split(SLASH, -1);
- tail = names[names.length - 1];
- }
- }
-
- public String toFullName() {
- return schema + domain + SLASH + container + SLASH + blobName;
- }
-
- public String getSchema() {
- return schema;
- }
-
- public String getDomain() {
- return domain;
- }
-
- public String getContainer() {
- return container;
- }
-
- public String getBlobName() {
- return blobName;
- }
-
- public String getTail() {
- return tail;
- }
-
- @Override
- public String toString() {
- return "PahtInfo{" +
- "schema='" + schema + '\'' +
- ", domain='" + domain + '\'' +
- ", container='" + container + '\'' +
- ", blobName='" + blobName + '\'' +
- ", tail='" + tail + '\'' +
- '}';
- }
+ private static final String SLASH = "/";
+
+ public static class PahtInfo {
+ private String schema = "http://"; // http
+ private String domain; //
+ private String container; // container name
+ private String blobName; // blob name
+ private String tail;
+
+ public PahtInfo(String domain, String container, String blobName) {
+ this.domain = domain;
+ this.container = container;
+ this.blobName = blobName;
+ if (blobName != null) {
+ String[] names = blobName.split(SLASH, -1);
+ tail = names[names.length - 1];
+ }
}
- /**
- * manipulate Azure storage resources and Blob container
管理命名空间下的存储资源和Blob容器
- */
- private BlobServiceClient serviceClient;
-
- /**
- * getBlobContainerClient
- *
- * @param containerName
- * @return client which can manipulate Azure Storage containers and their
blobs.<br>
- * 操作一个容器和其blobs的客户端
- */
- private BlobContainerClient getBlobContainerClient(String containerName) {
- return serviceClient.getBlobContainerClient(containerName);
+ public String toFullName() {
+ return schema + domain + SLASH + container + SLASH + blobName;
}
- private PahtInfo azureLocation(String path) {
- return this.azureLocation(new FsPath(path));
+ public String getSchema() {
+ return schema;
}
- /**
- * @param dest
- * @return domain name,container name,blob name
- */
- private PahtInfo azureLocation(FsPath dest) {
- //https://myaccount.blob.core.windows.net/mycontainer/dir/blobname
- // returns myaccount.blob.core.windows.net/mycontainer/dir/blobname
- String path = dest.getPath();
- // myaccount.blob.core.windows.net/mycontainer/dir/blobname
- // will split to myaccount.blob.core.windows.net
- // and mycontainer/dir/blobname
- String[] paths = path.split(SLASH, 2);
- if (paths.length < 2) {
- throw new IllegalArgumentException("file path error,with out
container:" + path);
- }
- // split to container and blob object,
- // container/dir/blobname will split to container and dir/blobname
- String[] names = paths[1].split(SLASH, 2);
- if (names.length < 2) {
- return new PahtInfo(paths[0], names[0], null);
- } else {
- return new PahtInfo(paths[0], names[0], names[1]);
- }
+ public String getDomain() {
+ return domain;
}
- /**
- * init serviceClient
- *
- * @param properties
- * @throws IOException
- */
- @Override
- public void init(Map<String, String> properties) throws IOException {
-
- /**
- * The storage account provides the top-level namespace for the Blob
service. 每个账户提供了一个顶级的命名空间
- */
- String acctName =
StorageConfiguration.AZURE_ACCT_NAME.getValue(properties);
- String connectStr =
StorageConfiguration.AZURE_ACCT_CONNECT_STR.getValue(properties);
- // Azure SDK client builders accept the credential as a parameter
- serviceClient =
- new BlobServiceClientBuilder()
- .endpoint(BLOB_SCHEMA + acctName +
".blob.core.windows.net/")
- .connectionString(connectStr)
- .buildClient();
+ public String getContainer() {
+ return container;
}
- /**
- * name of the fileSystem
- *
- * @return
- */
- @Override
- public String fsName() {
- return StorageUtils.BLOB;
+ public String getBlobName() {
+ return blobName;
}
- @Override
- public String rootUserName() {
- return "";
+ public String getTail() {
+ return tail;
}
- /**
- * @param dest
- * @return
- * @throws IOException
- */
@Override
- public FsPath get(String dest) throws IOException {
- FsPath path = new FsPath(dest);
- if (exists(path)) {
- return path;
- } else {
- throw new StorageWarnException(
- TO_BE_UNKNOW.getErrorCode(),
- "File or folder does not exist or file name is
garbled(文件或者文件夹不存在或者文件名乱码)");
- }
+ public String toString() {
+ return "PahtInfo{"
+ + "schema='"
+ + schema
+ + '\''
+ + ", domain='"
+ + domain
+ + '\''
+ + ", container='"
+ + container
+ + '\''
+ + ", blobName='"
+ + blobName
+ + '\''
+ + ", tail='"
+ + tail
+ + '\''
+ + '}';
}
-
- /**
- * Opens a blob input stream to download the blob.
- *
- * @param dest
- * @return
- * @throws BlobStorageException – If a storage service error occurred.
- */
- @Override
- public InputStream read(FsPath dest) {
- PahtInfo result = azureLocation(dest);
- BlobClient blobclient =
-
getBlobContainerClient(result.getContainer()).getBlobClient(result.getBlobName());
- return blobclient.openInputStream();
+ }
+
+ /** manipulate Azure storage resources and Blob container
管理命名空间下的存储资源和Blob容器 */
+ private BlobServiceClient serviceClient;
+
+ /**
+ * getBlobContainerClient
+ *
+ * @param containerName
+ * @return client which can manipulate Azure Storage containers and their
blobs.<br>
+ * 操作一个容器和其blobs的客户端
+ */
+ private BlobContainerClient getBlobContainerClient(String containerName) {
+ return serviceClient.getBlobContainerClient(containerName);
+ }
+
+ private PahtInfo azureLocation(String path) {
+ return this.azureLocation(new FsPath(path));
+ }
+
+ /**
+ * @param dest
+ * @return domain name,container name,blob name
+ */
+ private PahtInfo azureLocation(FsPath dest) {
+ // https://myaccount.blob.core.windows.net/mycontainer/dir/blobname
+ // returns myaccount.blob.core.windows.net/mycontainer/dir/blobname
+ String path = dest.getPath();
+ // myaccount.blob.core.windows.net/mycontainer/dir/blobname
+ // will split to myaccount.blob.core.windows.net
+ // and mycontainer/dir/blobname
+ String[] paths = path.split(SLASH, 2);
+ if (paths.length < 2) {
+ throw new IllegalArgumentException("file path error,with out container:"
+ path);
}
-
- /**
- * @param dest
- * @param overwrite
- * @return
- * @throws BlobStorageException – If a storage service error occurred.
- * @see BlockBlobClient #getBlobOutputStream
- */
- @Override
- public OutputStream write(FsPath dest, boolean overwrite) {
-
- PahtInfo result = azureLocation(dest);
- BlobClient blobclient =
-
getBlobContainerClient(result.getContainer()).getBlobClient(result.getBlobName());
- return blobclient.getBlockBlobClient().getBlobOutputStream(overwrite);
+ // split to container and blob object,
+ // container/dir/blobname will split to container and dir/blobname
+ String[] names = paths[1].split(SLASH, 2);
+ if (names.length < 2) {
+ return new PahtInfo(paths[0], names[0], null);
+ } else {
+ return new PahtInfo(paths[0], names[0], names[1]);
}
+ }
- /**
- * create a blob<br>
- * 创建一个对象("文件")
- *
- * @param dest
- * @return
- * @throws IOException
- */
- @Override
- public boolean create(String dest) throws IOException {
- FsPath path = new FsPath(dest);
- if (exists(path)) {
- return false;
- }
- PahtInfo names = this.azureLocation(dest);
- // TODO 如果是路径的话后面补一个文件.
- if (!names.getTail().contains(".")) {
- String tmp = names.toFullName() + SLASH + "_tmp.txt";
- names = this.azureLocation(tmp);
- }
- BlobContainerClient client =
serviceClient.createBlobContainerIfNotExists(names.getContainer());
- try (BlobOutputStream bos =
-
client.getBlobClient(names.getBlobName()).getBlockBlobClient().getBlobOutputStream())
{
- bos.write(1);
- bos.flush();
- }
-
- return true;
- }
+ /**
+ * init serviceClient
+ *
+ * @param properties
+ * @throws IOException
+ */
+ @Override
+ public void init(Map<String, String> properties) throws IOException {
/**
- * Flat listing 5000 results at a time,without deleted.<br>
- * 扁平化展示未删除的blob对象,最多5000条 TODO 分页接口,迭代器接口?
- *
- * @param path
- * @return
- * @throws IOException
+ * The storage account provides the top-level namespace for the Blob
service. 每个账户提供了一个顶级的命名空间
*/
- @Override
- public List<FsPath> list(FsPath path) throws IOException {
- final PahtInfo result = azureLocation(path);
- return
getBlobContainerClient(result.getContainer()).listBlobs().stream()
- // Azure不会返回已删除对象
- .filter(item -> !item.isDeleted())
- .map(item -> {
- FsPath tmp = new FsPath(result.toFullName() + SLASH +
item.getName());
- // TODO 根据观察使用contentType来区别"对象"和"路径",但文档中没有具体的说明
- if (item.getProperties().getContentType() == null) {
- tmp.setIsdir(true);
- }
- return tmp;
- })
- .collect(Collectors.toList());
+ String acctName =
StorageConfiguration.AZURE_ACCT_NAME().getValue(properties);
+ String connectStr =
StorageConfiguration.AZURE_ACCT_CONNECT_STR().getValue(properties);
+ // Azure SDK client builders accept the credential as a parameter
+ serviceClient =
+ new BlobServiceClientBuilder()
+ .endpoint(StorageUtils.BLOB_SCHEMA() + acctName +
".blob.core.windows.net/")
+ .connectionString(connectStr)
+ .buildClient();
+ }
+
+ /**
+ * name of the fileSystem
+ *
+ * @return
+ */
+ @Override
+ public String fsName() {
+ return StorageUtils.BLOB();
+ }
+
+ @Override
+ public String rootUserName() {
+ return "";
+ }
+
+ /**
+ * @param dest
+ * @return
+ * @throws IOException
+ */
+ @Override
+ public FsPath get(String dest) throws IOException {
+ FsPath path = new FsPath(dest);
+ if (exists(path)) {
+ return path;
+ } else {
+ throw new StorageWarnException(
+ TO_BE_UNKNOW.getErrorCode(),
+ "File or folder does not exist or file name is
garbled(文件或者文件夹不存在或者文件名乱码)");
}
-
- @Override
- public boolean canRead(FsPath dest) throws IOException {
- if (this.exists(dest)) {
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public boolean canWrite(FsPath dest) throws IOException {
- if (this.exists(dest)) {
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public boolean exists(FsPath dest) throws IOException {
- PahtInfo file = this.azureLocation(dest);
- return
getBlobContainerClient(file.getContainer()).getBlobClient(file.getBlobName()).exists();
- }
-
- @Override
- public boolean delete(FsPath dest) throws IOException {
- PahtInfo file = this.azureLocation(dest);
- return
getBlobContainerClient(file.getContainer()).getBlobClient(file.getBlobName()).deleteIfExists();
- }
-
- @Override
- public boolean copy(String origin, String dest) throws IOException {
- PahtInfo oriNames = this.azureLocation(origin);
- PahtInfo destNames = this.azureLocation(dest);
-
- BlobClient oriClient =
-
getBlobContainerClient(oriNames.getContainer()).getBlobClient(oriNames.getBlobName());
- BlockBlobClient destClient =
- getBlobContainerClient(destNames.getContainer())
- .getBlobClient(destNames.getBlobName())
- .getBlockBlobClient();
- SyncPoller<BlobCopyInfo, Void> poller =
destClient.beginCopy(oriClient.getBlobUrl(), Duration.ofSeconds(2));
- poller.waitForCompletion();
- return true;
- }
-
- @Override
- public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException
{
- // 没有事务性保证
- this.copy(oldDest.getPath(), newDest.getPath());
- this.delete(oldDest);
- return true;
- }
-
- @Override
- public boolean mkdir(FsPath dest) throws IOException {
- return this.create(dest.getPath());
- }
-
- @Override
- public boolean mkdirs(FsPath dest) throws IOException {
- return this.mkdir(dest);
- }
-
- // 下面这些方法可能都无法支持
- @Override
- public String listRoot() throws IOException {
- return "";
- }
-
- @Override
- public long getTotalSpace(FsPath dest) throws IOException {
- return 0;
- }
-
- @Override
- public long getFreeSpace(FsPath dest) throws IOException {
- return 0;
- }
-
- @Override
- public long getUsableSpace(FsPath dest) throws IOException {
- return 0;
- }
-
- @Override
- public boolean canExecute(FsPath dest) throws IOException {
- return false;
- }
-
- @Override
- public boolean setOwner(FsPath dest, String user, String group) throws
IOException {
- return false;
+ }
+
+ /**
+ * Opens a blob input stream to download the blob.
+ *
+ * @param dest
+ * @return
+ * @throws BlobStorageException – If a storage service error occurred.
+ */
+ @Override
+ public InputStream read(FsPath dest) {
+ PahtInfo result = azureLocation(dest);
+ BlobClient blobclient =
+
getBlobContainerClient(result.getContainer()).getBlobClient(result.getBlobName());
+ return blobclient.openInputStream();
+ }
+
+ /**
+ * @param dest
+ * @param overwrite
+ * @return
+ * @throws BlobStorageException – If a storage service error occurred.
+ * @see BlockBlobClient #getBlobOutputStream
+ */
+ @Override
+ public OutputStream write(FsPath dest, boolean overwrite) {
+
+ PahtInfo result = azureLocation(dest);
+ BlobClient blobclient =
+
getBlobContainerClient(result.getContainer()).getBlobClient(result.getBlobName());
+ return blobclient.getBlockBlobClient().getBlobOutputStream(overwrite);
+ }
+
+ /**
+ * create a blob<br>
+ * 创建一个对象("文件")
+ *
+ * @param dest
+ * @return
+ * @throws IOException
+ */
+ @Override
+ public boolean create(String dest) throws IOException {
+ FsPath path = new FsPath(dest);
+ if (exists(path)) {
+ return false;
}
-
- @Override
- public boolean setOwner(FsPath dest, String user) throws IOException {
- return false;
+ PahtInfo names = this.azureLocation(dest);
+ // TODO 如果是路径的话后面补一个文件.
+ if (!names.getTail().contains(".")) {
+ String tmp = names.toFullName() + SLASH + "_tmp.txt";
+ names = this.azureLocation(tmp);
}
-
- @Override
- public boolean setGroup(FsPath dest, String group) throws IOException {
- return false;
+ BlobContainerClient client =
serviceClient.createBlobContainerIfNotExists(names.getContainer());
+ try (BlobOutputStream bos =
+
client.getBlobClient(names.getBlobName()).getBlockBlobClient().getBlobOutputStream())
{
+ bos.write(1);
+ bos.flush();
}
- @Override
- public boolean setPermission(FsPath dest, String permission) throws
IOException {
- return false;
+ return true;
+ }
+
+ /**
+ * Flat listing 5000 results at a time,without deleted.<br>
+ * 扁平化展示未删除的blob对象,最多5000条 TODO 分页接口,迭代器接口?
+ *
+ * @param path
+ * @return
+ * @throws IOException
+ */
+ @Override
+ public List<FsPath> list(FsPath path) throws IOException {
+ final PahtInfo result = azureLocation(path);
+ return getBlobContainerClient(result.getContainer()).listBlobs().stream()
+ // Azure不会返回已删除对象
+ .filter(item -> !item.isDeleted())
+ .map(
+ item -> {
+ FsPath tmp = new FsPath(result.toFullName() + SLASH +
item.getName());
+ // TODO 根据观察使用contentType来区别"对象"和"路径",但文档中没有具体的说明
+ if (item.getProperties().getContentType() == null) {
+ tmp.setIsdir(true);
+ }
+ return tmp;
+ })
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public boolean canRead(FsPath dest) throws IOException {
+ if (this.exists(dest)) {
+ return true;
+ } else {
+ return false;
}
-
- @Override
- public void close() throws IOException {
+ }
+
+ @Override
+ public boolean canRead(FsPath dest, String user) throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean canWrite(FsPath dest) throws IOException {
+ if (this.exists(dest)) {
+ return true;
+ } else {
+ return false;
}
+ }
+
+ @Override
+ public boolean exists(FsPath dest) throws IOException {
+ PahtInfo file = this.azureLocation(dest);
+ return
getBlobContainerClient(file.getContainer()).getBlobClient(file.getBlobName()).exists();
+ }
+
+ @Override
+ public boolean delete(FsPath dest) throws IOException {
+ PahtInfo file = this.azureLocation(dest);
+ return getBlobContainerClient(file.getContainer())
+ .getBlobClient(file.getBlobName())
+ .deleteIfExists();
+ }
+
+ @Override
+ public boolean copy(String origin, String dest) throws IOException {
+ PahtInfo oriNames = this.azureLocation(origin);
+ PahtInfo destNames = this.azureLocation(dest);
+
+ BlobClient oriClient =
+
getBlobContainerClient(oriNames.getContainer()).getBlobClient(oriNames.getBlobName());
+ BlockBlobClient destClient =
+ getBlobContainerClient(destNames.getContainer())
+ .getBlobClient(destNames.getBlobName())
+ .getBlockBlobClient();
+ SyncPoller<BlobCopyInfo, Void> poller =
+ destClient.beginCopy(oriClient.getBlobUrl(), Duration.ofSeconds(2));
+ poller.waitForCompletion();
+ return true;
+ }
+
+ @Override
+ public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException {
+ // 没有事务性保证
+ this.copy(oldDest.getPath(), newDest.getPath());
+ this.delete(oldDest);
+ return true;
+ }
+
+ @Override
+ public boolean mkdir(FsPath dest) throws IOException {
+ return this.create(dest.getPath());
+ }
+
+ @Override
+ public boolean mkdirs(FsPath dest) throws IOException {
+ return this.mkdir(dest);
+ }
+
+ // 下面这些方法可能都无法支持
+ @Override
+ public String listRoot() throws IOException {
+ return "";
+ }
+
+ @Override
+ public long getTotalSpace(FsPath dest) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public long getFreeSpace(FsPath dest) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public long getUsableSpace(FsPath dest) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public long getLength(FsPath dest) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public String checkSum(FsPath dest) throws IOException {
+ return null;
+ }
+
+ @Override
+ public boolean canExecute(FsPath dest) throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean setOwner(FsPath dest, String user, String group) throws
IOException {
+ return false;
+ }
+
+ @Override
+ public boolean setOwner(FsPath dest, String user) throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean setGroup(FsPath dest, String group) throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean setPermission(FsPath dest, String permission) throws
IOException {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {}
}
diff --git
a/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala
b/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala
index a821038005..e5adef9124 100644
---
a/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala
+++
b/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala
@@ -1,83 +1,84 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.storage.utils
-
-import org.junit.jupiter.api.{Assertions, DisplayName, Test}
-
-class StorageConfigurationTest {
-
- @Test
- @DisplayName("constTest")
- def constTest(): Unit = {
-
- val storagerootuser = StorageConfiguration.STORAGE_ROOT_USER.getValue
- val hdfsrootuser = StorageConfiguration.HDFS_ROOT_USER.getValue
- val localrootuser = StorageConfiguration.LOCAL_ROOT_USER.getValue
- val storageusergroup = StorageConfiguration.STORAGE_USER_GROUP.getValue
- val storagersfiletype = StorageConfiguration.STORAGE_RS_FILE_TYPE.getValue
- val storagersfilesuffix =
StorageConfiguration.STORAGE_RS_FILE_SUFFIX.getValue
- val types = StorageConfiguration.ResultTypes
- val storageresultsetpackage =
StorageConfiguration.STORAGE_RESULT_SET_PACKAGE.getValue
- val storageresultsetclasses =
StorageConfiguration.STORAGE_RESULT_SET_CLASSES.getValue
- val storagebuildfsclasses =
StorageConfiguration.STORAGE_BUILD_FS_CLASSES.getValue
- val issharenode = StorageConfiguration.IS_SHARE_NODE.getValue
- val enableioproxy = StorageConfiguration.ENABLE_IO_PROXY.getValue
- val ioUser = StorageConfiguration.IO_USER.getValue
- val iofsexpiretime = StorageConfiguration.IO_FS_EXPIRE_TIME.getValue
- val iodefaultcreator = StorageConfiguration.IO_DEFAULT_CREATOR.getValue
- val iofsreinit = StorageConfiguration.IO_FS_RE_INIT.getValue
- val ioinitretrylimit = StorageConfiguration.IO_INIT_RETRY_LIMIT.getValue
- val storagehdfsgroup = StorageConfiguration.STORAGE_HDFS_GROUP.getValue
- val doublefractionlen = StorageConfiguration.DOUBLE_FRACTION_LEN.getValue
- val hdfspathprefixcheckon =
StorageConfiguration.HDFS_PATH_PREFIX_CHECK_ON.getValue
- val hdfspathprefixremove =
StorageConfiguration.HDFS_PATH_PREFIX_REMOVE.getValue
- val fschecksumdisbale = StorageConfiguration.FS_CHECKSUM_DISBALE
-
- Assertions.assertEquals("hadoop", storagerootuser)
- Assertions.assertEquals("hadoop", hdfsrootuser)
- Assertions.assertEquals("root", localrootuser)
- Assertions.assertEquals("bdap", storageusergroup)
- Assertions.assertEquals("utf-8", storagersfiletype)
- Assertions.assertEquals(".dolphin", storagersfilesuffix)
- Assertions.assertTrue(types.size > 0)
- Assertions.assertEquals("org.apache.linkis.storage.resultset",
storageresultsetpackage)
- Assertions.assertEquals(
-
"txt.TextResultSet,table.TableResultSet,io.IOResultSet,html.HtmlResultSet,picture.PictureResultSet",
- storageresultsetclasses
- )
- Assertions.assertEquals(
-
"org.apache.linkis.storage.factory.impl.BuildHDFSFileSystem,org.apache.linkis.storage.factory.impl.BuildLocalFileSystem,"
+
-
"org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem",
- storagebuildfsclasses
- )
- Assertions.assertTrue(issharenode)
- Assertions.assertFalse(enableioproxy)
- Assertions.assertEquals("root", ioUser)
- Assertions.assertTrue(600000 == iofsexpiretime)
- Assertions.assertEquals("IDE", iodefaultcreator)
- Assertions.assertEquals("re-init", iofsreinit)
- Assertions.assertTrue(10 == ioinitretrylimit)
- Assertions.assertEquals("hadoop", storagehdfsgroup)
- Assertions.assertTrue(30 == doublefractionlen)
- Assertions.assertTrue(hdfspathprefixcheckon)
- Assertions.assertTrue(hdfspathprefixremove)
- Assertions.assertFalse(fschecksumdisbale)
-
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.storage.utils
+
+import org.junit.jupiter.api.{Assertions, DisplayName, Test}
+
+class StorageConfigurationTest {
+
+ @Test
+ @DisplayName("constTest")
+ def constTest(): Unit = {
+
+ val storagerootuser = StorageConfiguration.STORAGE_ROOT_USER.getValue
+ val hdfsrootuser = StorageConfiguration.HDFS_ROOT_USER.getValue
+ val localrootuser = StorageConfiguration.LOCAL_ROOT_USER.getValue
+ val storageusergroup = StorageConfiguration.STORAGE_USER_GROUP.getValue
+ val storagersfiletype = StorageConfiguration.STORAGE_RS_FILE_TYPE.getValue
+ val storagersfilesuffix =
StorageConfiguration.STORAGE_RS_FILE_SUFFIX.getValue
+ val types = StorageConfiguration.ResultTypes
+ val storageresultsetpackage =
StorageConfiguration.STORAGE_RESULT_SET_PACKAGE.getValue
+ val storageresultsetclasses =
StorageConfiguration.STORAGE_RESULT_SET_CLASSES.getValue
+ val storagebuildfsclasses =
StorageConfiguration.STORAGE_BUILD_FS_CLASSES.getValue
+ val issharenode = StorageConfiguration.IS_SHARE_NODE.getValue
+ val enableioproxy = StorageConfiguration.ENABLE_IO_PROXY.getValue
+ val ioUser = StorageConfiguration.IO_USER.getValue
+ val iofsexpiretime = StorageConfiguration.IO_FS_EXPIRE_TIME.getValue
+ val iodefaultcreator = StorageConfiguration.IO_DEFAULT_CREATOR.getValue
+ val iofsreinit = StorageConfiguration.IO_FS_RE_INIT.getValue
+ val ioinitretrylimit = StorageConfiguration.IO_INIT_RETRY_LIMIT.getValue
+ val storagehdfsgroup = StorageConfiguration.STORAGE_HDFS_GROUP.getValue
+ val doublefractionlen = StorageConfiguration.DOUBLE_FRACTION_LEN.getValue
+ val hdfspathprefixcheckon =
StorageConfiguration.HDFS_PATH_PREFIX_CHECK_ON.getValue
+ val hdfspathprefixremove =
StorageConfiguration.HDFS_PATH_PREFIX_REMOVE.getValue
+ val fschecksumdisbale = StorageConfiguration.FS_CHECKSUM_DISBALE
+
+ Assertions.assertEquals("hadoop", storagerootuser)
+ Assertions.assertEquals("hadoop", hdfsrootuser)
+ Assertions.assertEquals("root", localrootuser)
+ Assertions.assertEquals("bdap", storageusergroup)
+ Assertions.assertEquals("utf-8", storagersfiletype)
+ Assertions.assertEquals(".dolphin", storagersfilesuffix)
+ Assertions.assertTrue(types.size > 0)
+ Assertions.assertEquals("org.apache.linkis.storage.resultset",
storageresultsetpackage)
+ Assertions.assertEquals(
+
"txt.TextResultSet,table.TableResultSet,io.IOResultSet,html.HtmlResultSet,picture.PictureResultSet",
+ storageresultsetclasses
+ )
+ Assertions.assertEquals(
+
"org.apache.linkis.storage.factory.impl.BuildHDFSFileSystem,org.apache.linkis.storage.factory.impl.BuildLocalFileSystem,"
+
+
"org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem,"
+
+ "org.apache.linkis.storage.factory.impl.BuildAzureBlobFileSystem",
+ storagebuildfsclasses
+ )
+ Assertions.assertTrue(issharenode)
+ Assertions.assertFalse(enableioproxy)
+ Assertions.assertEquals("root", ioUser)
+ Assertions.assertTrue(600000 == iofsexpiretime)
+ Assertions.assertEquals("IDE", iodefaultcreator)
+ Assertions.assertEquals("re-init", iofsreinit)
+ Assertions.assertTrue(10 == ioinitretrylimit)
+ Assertions.assertEquals("hadoop", storagehdfsgroup)
+ Assertions.assertTrue(30 == doublefractionlen)
+ Assertions.assertTrue(hdfspathprefixcheckon)
+ Assertions.assertTrue(hdfspathprefixremove)
+ Assertions.assertFalse(fschecksumdisbale)
+
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]