This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new bf1df9faf Fix a series of problems in the s3 file system (#5072)
bf1df9faf is described below

commit bf1df9faf8c584f2da513b93141bd34e3cfa9389
Author: LiuGuoHua <[email protected]>
AuthorDate: Wed Jan 17 21:09:47 2024 +0800

    Fix a series of problems in the s3 file system (#5072)
---
 .../linkis/storage/fs/impl/S3FileSystem.java       | 132 +++++++++++++++++----
 1 file changed, 112 insertions(+), 20 deletions(-)

diff --git 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java
 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java
index b8f6401b1..026a6f9e0 100644
--- 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java
+++ 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java
@@ -38,10 +38,7 @@ import com.amazonaws.auth.BasicAWSCredentials;
 import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.model.AmazonS3Exception;
-import com.amazonaws.services.s3.model.ListObjectsV2Result;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.model.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -114,7 +111,7 @@ public class S3FileSystem extends FileSystem {
   @Override
   public InputStream read(FsPath dest) throws IOException {
     try {
-      return s3Client.getObject(bucket, dest.getPath()).getObjectContent();
+      return s3Client.getObject(bucket, buildPrefix(dest.getPath(), 
false)).getObjectContent();
     } catch (AmazonS3Exception e) {
       throw new IOException("You have not permission to access path " + 
dest.getPath());
     }
@@ -123,8 +120,9 @@ public class S3FileSystem extends FileSystem {
   @Override
   public OutputStream write(FsPath dest, boolean overwrite) throws IOException 
{
     try (InputStream inputStream = read(dest);
-        OutputStream outputStream = new S3OutputStream(s3Client, bucket, 
dest.getPath())) {
-      if (overwrite) {
+        OutputStream outputStream =
+            new S3OutputStream(s3Client, bucket, buildPrefix(dest.getPath(), 
false))) {
+      if (!overwrite) {
         IOUtils.copy(inputStream, outputStream);
       }
       return outputStream;
@@ -164,20 +162,37 @@ public class S3FileSystem extends FileSystem {
 
   @Override
   public FsPathListWithError listPathWithError(FsPath path) throws IOException 
{
+    return listPathWithError(path, true);
+  }
+
+  public FsPathListWithError listPathWithError(FsPath path, boolean 
ignoreInitFile)
+      throws IOException {
+    List<FsPath> rtn = new ArrayList<>();
     try {
       if (!StringUtils.isEmpty(path.getPath())) {
-        ListObjectsV2Result listObjectsV2Result = 
s3Client.listObjectsV2(bucket, path.getPath());
-        List<S3ObjectSummary> s3ObjectSummaries = 
listObjectsV2Result.getObjectSummaries();
+        ListObjectsV2Request listObjectsV2Request =
+            new ListObjectsV2Request()
+                .withBucketName(bucket)
+                .withPrefix(buildPrefix(path.getPath()))
+                .withDelimiter("/");
+        ListObjectsV2Result dirResult = 
s3Client.listObjectsV2(listObjectsV2Request);
+        List<S3ObjectSummary> s3ObjectSummaries = 
dirResult.getObjectSummaries();
+        List<String> commonPrefixes = dirResult.getCommonPrefixes();
         if (s3ObjectSummaries != null) {
-          List<FsPath> rtn = new ArrayList();
-          String message = "";
           for (S3ObjectSummary summary : s3ObjectSummaries) {
-            if (isDir(summary, path.getPath()) || isInitFile(summary)) 
continue;
+            if (isInitFile(summary) && ignoreInitFile) continue;
             FsPath newPath = new FsPath(buildPath(summary.getKey()));
             rtn.add(fillStorageFile(newPath, summary));
           }
-          return new FsPathListWithError(rtn, message);
         }
+        if (commonPrefixes != null) {
+          for (String dir : commonPrefixes) {
+            FsPath newPath = new FsPath(buildPath(dir));
+            newPath.setIsdir(true);
+            rtn.add(newPath);
+          }
+        }
+        return new FsPathListWithError(rtn, "");
       }
     } catch (AmazonS3Exception e) {
       throw new IOException("You have not permission to access path " + 
path.getPath());
@@ -189,8 +204,25 @@ public class S3FileSystem extends FileSystem {
   @Override
   public boolean exists(FsPath dest) throws IOException {
     try {
-      int size = s3Client.listObjectsV2(bucket, 
dest.getPath()).getObjectSummaries().size();
-      return size > 0;
+      if (new File(dest.getPath()).getName().contains(".")) {
+        return existsFile(dest);
+      }
+      ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
+      listObjectsV2Request
+          .withBucketName(bucket)
+          .withPrefix(buildPrefix(dest.getPath()))
+          .withDelimiter("/");
+      return 
s3Client.listObjectsV2(listObjectsV2Request).getObjectSummaries().size()
+              + 
s3Client.listObjectsV2(listObjectsV2Request).getCommonPrefixes().size()
+          > 0;
+    } catch (AmazonS3Exception e) {
+      return false;
+    }
+  }
+
+  public boolean existsFile(FsPath dest) {
+    try {
+      return s3Client.doesObjectExist(bucket, buildPrefix(dest.getPath(), 
false));
     } catch (AmazonS3Exception e) {
       return false;
     }
@@ -199,7 +231,14 @@ public class S3FileSystem extends FileSystem {
   @Override
   public boolean delete(FsPath dest) throws IOException {
     try {
-      s3Client.deleteObject(bucket, dest.getPath());
+      ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
+      
listObjectsV2Request.withBucketName(bucket).withPrefix(buildPrefix(dest.getPath(),
 false));
+      ListObjectsV2Result result = 
s3Client.listObjectsV2(listObjectsV2Request);
+      String[] keyList =
+          
result.getObjectSummaries().stream().map(S3ObjectSummary::getKey).toArray(String[]::new);
+      DeleteObjectsRequest deleteObjectsRequest =
+          new DeleteObjectsRequest("test").withKeys(keyList);
+      s3Client.deleteObjects(deleteObjectsRequest);
       return true;
     } catch (AmazonS3Exception e) {
       throw new IOException("You have not permission to access path " + 
dest.getPath());
@@ -209,8 +248,25 @@ public class S3FileSystem extends FileSystem {
   @Override
   public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException {
     try {
-      s3Client.copyObject(bucket, oldDest.getPath(), bucket, 
newDest.getPath());
-      s3Client.deleteObject(bucket, oldDest.getPath());
+      String newOriginPath = buildPrefix(oldDest.getPath(), false);
+      String newDestPath = buildPrefix(newDest.getPath(), false);
+      ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
+      listObjectsV2Request.withBucketName(bucket).withPrefix(newOriginPath);
+      ListObjectsV2Result result = 
s3Client.listObjectsV2(listObjectsV2Request);
+      List<String> keyList =
+          result.getObjectSummaries().stream()
+              .map(S3ObjectSummary::getKey)
+              .collect(Collectors.toList());
+      List<String> newKeyList =
+          keyList.stream()
+              .map(key -> key.replaceFirst(newOriginPath, newDestPath))
+              .collect(Collectors.toList());
+      for (int i = 0; i < keyList.size(); i++) {
+        String key = keyList.get(i);
+        String newKey = newKeyList.get(i);
+        s3Client.copyObject(bucket, key, bucket, newKey);
+        s3Client.deleteObject(bucket, key);
+      }
       return true;
     } catch (AmazonS3Exception e) {
       s3Client.deleteObject(bucket, newDest.getPath());
@@ -225,7 +281,24 @@ public class S3FileSystem extends FileSystem {
   @Override
   public boolean copy(String origin, String dest) throws IOException {
     try {
-      s3Client.copyObject(bucket, origin, bucket, dest);
+      String newOrigin = buildPrefix(origin, false);
+      String newDest = buildPrefix(dest, false);
+      ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
+      listObjectsV2Request.withBucketName(bucket).withPrefix(newOrigin);
+      ListObjectsV2Result result = 
s3Client.listObjectsV2(listObjectsV2Request);
+      List<String> keyList =
+          result.getObjectSummaries().stream()
+              .map(S3ObjectSummary::getKey)
+              .collect(Collectors.toList());
+      List<String> newKeyList =
+          keyList.stream()
+              .map(key -> key.replaceFirst(newOrigin, newDest))
+              .collect(Collectors.toList());
+      for (int i = 0; i < keyList.size(); i++) {
+        String key = keyList.get(i);
+        String newKey = newKeyList.get(i);
+        s3Client.copyObject(bucket, key, bucket, newKey);
+      }
       return true;
     } catch (AmazonS3Exception e) {
       throw new IOException("You have not permission to access path " + origin 
+ " or " + dest);
@@ -261,7 +334,10 @@ public class S3FileSystem extends FileSystem {
 
   private FsPath fillStorageFile(FsPath fsPath, S3ObjectSummary 
s3ObjectSummary) {
     fsPath.setModification_time(s3ObjectSummary.getLastModified().getTime());
-    fsPath.setOwner(s3ObjectSummary.getOwner().getDisplayName());
+    Owner owner = s3ObjectSummary.getOwner();
+    if (owner != null) {
+      fsPath.setOwner(owner.getDisplayName());
+    }
     try {
       fsPath.setIsdir(isDir(s3ObjectSummary, fsPath.getParent().getPath()));
     } catch (Throwable e) {
@@ -344,6 +420,22 @@ public class S3FileSystem extends FileSystem {
     }
     return StorageUtils.S3_SCHEMA + "/" + path;
   }
+
+  public String buildPrefix(String path, boolean addTail) {
+    String res = path;
+    if (path == null || "".equals(path)) return "";
+    if (path.startsWith("/")) {
+      res = path.replaceFirst("/", "");
+    }
+    if (!path.endsWith("/") && addTail) {
+      res = res + "/";
+    }
+    return res;
+  }
+
+  public String buildPrefix(String path) {
+    return buildPrefix(path, true);
+  }
 }
 
 class S3OutputStream extends ByteArrayOutputStream {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to