This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new bf1df9faf Fix a series of problems in the s3 file system (#5072)
bf1df9faf is described below
commit bf1df9faf8c584f2da513b93141bd34e3cfa9389
Author: LiuGuoHua <[email protected]>
AuthorDate: Wed Jan 17 21:09:47 2024 +0800
Fix a series of problems in the s3 file system (#5072)
---
.../linkis/storage/fs/impl/S3FileSystem.java | 132 +++++++++++++++++----
1 file changed, 112 insertions(+), 20 deletions(-)
diff --git
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java
index b8f6401b1..026a6f9e0 100644
---
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java
+++
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java
@@ -38,10 +38,7 @@ import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.model.AmazonS3Exception;
-import com.amazonaws.services.s3.model.ListObjectsV2Result;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.model.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -114,7 +111,7 @@ public class S3FileSystem extends FileSystem {
@Override
public InputStream read(FsPath dest) throws IOException {
try {
- return s3Client.getObject(bucket, dest.getPath()).getObjectContent();
+ return s3Client.getObject(bucket, buildPrefix(dest.getPath(),
false)).getObjectContent();
} catch (AmazonS3Exception e) {
throw new IOException("You have not permission to access path " +
dest.getPath());
}
@@ -123,8 +120,9 @@ public class S3FileSystem extends FileSystem {
@Override
public OutputStream write(FsPath dest, boolean overwrite) throws IOException
{
try (InputStream inputStream = read(dest);
- OutputStream outputStream = new S3OutputStream(s3Client, bucket,
dest.getPath())) {
- if (overwrite) {
+ OutputStream outputStream =
+ new S3OutputStream(s3Client, bucket, buildPrefix(dest.getPath(),
false))) {
+ if (!overwrite) {
IOUtils.copy(inputStream, outputStream);
}
return outputStream;
@@ -164,20 +162,37 @@ public class S3FileSystem extends FileSystem {
@Override
public FsPathListWithError listPathWithError(FsPath path) throws IOException
{
+ return listPathWithError(path, true);
+ }
+
+ public FsPathListWithError listPathWithError(FsPath path, boolean
ignoreInitFile)
+ throws IOException {
+ List<FsPath> rtn = new ArrayList<>();
try {
if (!StringUtils.isEmpty(path.getPath())) {
- ListObjectsV2Result listObjectsV2Result =
s3Client.listObjectsV2(bucket, path.getPath());
- List<S3ObjectSummary> s3ObjectSummaries =
listObjectsV2Result.getObjectSummaries();
+ ListObjectsV2Request listObjectsV2Request =
+ new ListObjectsV2Request()
+ .withBucketName(bucket)
+ .withPrefix(buildPrefix(path.getPath()))
+ .withDelimiter("/");
+ ListObjectsV2Result dirResult =
s3Client.listObjectsV2(listObjectsV2Request);
+ List<S3ObjectSummary> s3ObjectSummaries =
dirResult.getObjectSummaries();
+ List<String> commonPrefixes = dirResult.getCommonPrefixes();
if (s3ObjectSummaries != null) {
- List<FsPath> rtn = new ArrayList();
- String message = "";
for (S3ObjectSummary summary : s3ObjectSummaries) {
- if (isDir(summary, path.getPath()) || isInitFile(summary))
continue;
+ if (isInitFile(summary) && ignoreInitFile) continue;
FsPath newPath = new FsPath(buildPath(summary.getKey()));
rtn.add(fillStorageFile(newPath, summary));
}
- return new FsPathListWithError(rtn, message);
}
+ if (commonPrefixes != null) {
+ for (String dir : commonPrefixes) {
+ FsPath newPath = new FsPath(buildPath(dir));
+ newPath.setIsdir(true);
+ rtn.add(newPath);
+ }
+ }
+ return new FsPathListWithError(rtn, "");
}
} catch (AmazonS3Exception e) {
throw new IOException("You have not permission to access path " +
path.getPath());
@@ -189,8 +204,25 @@ public class S3FileSystem extends FileSystem {
@Override
public boolean exists(FsPath dest) throws IOException {
try {
- int size = s3Client.listObjectsV2(bucket,
dest.getPath()).getObjectSummaries().size();
- return size > 0;
+ if (new File(dest.getPath()).getName().contains(".")) {
+ return existsFile(dest);
+ }
+ ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
+ listObjectsV2Request
+ .withBucketName(bucket)
+ .withPrefix(buildPrefix(dest.getPath()))
+ .withDelimiter("/");
+ return
s3Client.listObjectsV2(listObjectsV2Request).getObjectSummaries().size()
+ +
s3Client.listObjectsV2(listObjectsV2Request).getCommonPrefixes().size()
+ > 0;
+ } catch (AmazonS3Exception e) {
+ return false;
+ }
+ }
+
+ public boolean existsFile(FsPath dest) {
+ try {
+ return s3Client.doesObjectExist(bucket, buildPrefix(dest.getPath(),
false));
} catch (AmazonS3Exception e) {
return false;
}
@@ -199,7 +231,14 @@ public class S3FileSystem extends FileSystem {
@Override
public boolean delete(FsPath dest) throws IOException {
try {
- s3Client.deleteObject(bucket, dest.getPath());
+ ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
+
listObjectsV2Request.withBucketName(bucket).withPrefix(buildPrefix(dest.getPath(),
false));
+ ListObjectsV2Result result =
s3Client.listObjectsV2(listObjectsV2Request);
+ String[] keyList =
+
result.getObjectSummaries().stream().map(S3ObjectSummary::getKey).toArray(String[]::new);
+ DeleteObjectsRequest deleteObjectsRequest =
+ new DeleteObjectsRequest("test").withKeys(keyList);
+ s3Client.deleteObjects(deleteObjectsRequest);
return true;
} catch (AmazonS3Exception e) {
throw new IOException("You have not permission to access path " +
dest.getPath());
@@ -209,8 +248,25 @@ public class S3FileSystem extends FileSystem {
@Override
public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException {
try {
- s3Client.copyObject(bucket, oldDest.getPath(), bucket,
newDest.getPath());
- s3Client.deleteObject(bucket, oldDest.getPath());
+ String newOriginPath = buildPrefix(oldDest.getPath(), false);
+ String newDestPath = buildPrefix(newDest.getPath(), false);
+ ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
+ listObjectsV2Request.withBucketName(bucket).withPrefix(newOriginPath);
+ ListObjectsV2Result result =
s3Client.listObjectsV2(listObjectsV2Request);
+ List<String> keyList =
+ result.getObjectSummaries().stream()
+ .map(S3ObjectSummary::getKey)
+ .collect(Collectors.toList());
+ List<String> newKeyList =
+ keyList.stream()
+ .map(key -> key.replaceFirst(newOriginPath, newDestPath))
+ .collect(Collectors.toList());
+ for (int i = 0; i < keyList.size(); i++) {
+ String key = keyList.get(i);
+ String newKey = newKeyList.get(i);
+ s3Client.copyObject(bucket, key, bucket, newKey);
+ s3Client.deleteObject(bucket, key);
+ }
return true;
} catch (AmazonS3Exception e) {
s3Client.deleteObject(bucket, newDest.getPath());
@@ -225,7 +281,24 @@ public class S3FileSystem extends FileSystem {
@Override
public boolean copy(String origin, String dest) throws IOException {
try {
- s3Client.copyObject(bucket, origin, bucket, dest);
+ String newOrigin = buildPrefix(origin, false);
+ String newDest = buildPrefix(dest, false);
+ ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
+ listObjectsV2Request.withBucketName(bucket).withPrefix(newOrigin);
+ ListObjectsV2Result result =
s3Client.listObjectsV2(listObjectsV2Request);
+ List<String> keyList =
+ result.getObjectSummaries().stream()
+ .map(S3ObjectSummary::getKey)
+ .collect(Collectors.toList());
+ List<String> newKeyList =
+ keyList.stream()
+ .map(key -> key.replaceFirst(newOrigin, newDest))
+ .collect(Collectors.toList());
+ for (int i = 0; i < keyList.size(); i++) {
+ String key = keyList.get(i);
+ String newKey = newKeyList.get(i);
+ s3Client.copyObject(bucket, key, bucket, newKey);
+ }
return true;
} catch (AmazonS3Exception e) {
throw new IOException("You have not permission to access path " + origin
+ " or " + dest);
@@ -261,7 +334,10 @@ public class S3FileSystem extends FileSystem {
private FsPath fillStorageFile(FsPath fsPath, S3ObjectSummary
s3ObjectSummary) {
fsPath.setModification_time(s3ObjectSummary.getLastModified().getTime());
- fsPath.setOwner(s3ObjectSummary.getOwner().getDisplayName());
+ Owner owner = s3ObjectSummary.getOwner();
+ if (owner != null) {
+ fsPath.setOwner(owner.getDisplayName());
+ }
try {
fsPath.setIsdir(isDir(s3ObjectSummary, fsPath.getParent().getPath()));
} catch (Throwable e) {
@@ -344,6 +420,22 @@ public class S3FileSystem extends FileSystem {
}
return StorageUtils.S3_SCHEMA + "/" + path;
}
+
+ public String buildPrefix(String path, boolean addTail) {
+ String res = path;
+ if (path == null || "".equals(path)) return "";
+ if (path.startsWith("/")) {
+ res = path.replaceFirst("/", "");
+ }
+ if (!path.endsWith("/") && addTail) {
+ res = res + "/";
+ }
+ return res;
+ }
+
+ public String buildPrefix(String path) {
+ return buildPrefix(path, true);
+ }
}
class S3OutputStream extends ByteArrayOutputStream {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]