ShadowySpirits commented on code in PR #6495:
URL: https://github.com/apache/rocketmq/pull/6495#discussion_r1162266544


##########
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java:
##########
@@ -69,6 +75,6 @@ public interface TieredStoreProvider {
      * @param append try to append or create a new file
      * @return put result, <code>true</code> if data successfully write; 
<code>false</code> otherwise
      */
-    CompletableFuture<Boolean> 
commit0(TieredFileSegment.TieredFileSegmentInputStream inputStream,
-        long position, int length, boolean append);
+    CompletableFuture<Boolean> commit0(InputStream inputStream,

Review Comment:
   We should not change this.



##########
.ijwb/.bazelproject:
##########
@@ -0,0 +1,4 @@
+directories:
+  .
+
+derive_targets_from_directories: false

Review Comment:
   Why do we need this file? 



##########
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/s3/TieredStorageS3Client.java:
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.s3;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.core.async.AsyncRequestBody;
+import software.amazon.awssdk.core.async.AsyncResponseTransformer;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.Delete;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectResponse;
+import software.amazon.awssdk.services.s3.model.UploadPartCopyRequest;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+public class TieredStorageS3Client {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+    private volatile static TieredStorageS3Client instance;
+
+    private final String region;
+
+    private final String bucket;
+
+    private final TieredMessageStoreConfig tieredMessageStoreConfig;
+
+    private final ExecutorService asyncRequestBodyExecutor;
+
+    private S3AsyncClient client;
+
+    public static TieredStorageS3Client getInstance(TieredMessageStoreConfig 
config) {
+        if (config == null) {
+            return instance;
+        }
+        if (instance == null) {
+            synchronized (TieredStorageS3Client.class) {
+                if (instance == null) {
+                    instance = new TieredStorageS3Client(config, true);
+                }
+            }
+        }
+        return instance;
+    }
+
+    @VisibleForTesting
+    public TieredStorageS3Client(TieredMessageStoreConfig config) {
+        this(config, false);
+    }

Review Comment:
   This constructor is already public.



##########
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/s3/ChunkMetadata.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.s3;
+
+/**
+ * Metadata of a chunk in S3.
+ *
+ * <p>
+ * There are two types of chunks in S3:
+ * <ul>
+ *     <li>Normal chunk, represents a normal chunk in S3, which size is 
usually less than {@link 
org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig#tieredStoreGroupCommitSize}
+ *     <li>Segment chunk, means that this all normal chunks in one logic 
segment have been merged into a single chunk, which is named as segment chunk,
+ *     which size is usually equals to {@link 
org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig#tieredStoreCommitLogMaxSize}
 or {@link 
org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig#tieredStoreConsumeQueueMaxSize}

Review Comment:
   `tieredStoreCommitLogMaxSize` is private, use the get function instead.



##########
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/s3/ChunkMetadata.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.s3;
+
+/**
+ * Metadata of a chunk in S3.
+ *
+ * <p>
+ * There are two types of chunks in S3:
+ * <ul>
+ *     <li>Normal chunk, represents a normal chunk in S3, which size is 
usually less than {@link 
org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig#tieredStoreGroupCommitSize}
+ *     <li>Segment chunk, means that this all normal chunks in one logic 
segment have been merged into a single chunk, which is named as segment chunk,
+ *     which size is usually equals to {@link 
org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig#tieredStoreCommitLogMaxSize}
 or {@link 
org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig#tieredStoreConsumeQueueMaxSize}
+ * </ul>
+ * Once a segment chunk is created, it will never be changed, and we should 
delete all normal chunks in this segment.
+ */
+public class ChunkMetadata {
+
+    /**
+     * Name of the chunk in S3. Format:
+     * <p>
+     * Chunk:
+     * <pre>
+     *     {@link S3FileSegment#storePath}/chunk/chunk-${startPosition}

Review Comment:
   Ditto.



##########
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java:
##########
@@ -105,6 +105,14 @@ public boolean check(TieredStorageLevel targetLevel) {
     private String ossAccessKey = "";
     private String ossSecretKey = "";
 
+    private String s3Region = "";
+
+    private String s3Bucket = "";
+
+    private String s3AccessKey = "";

Review Comment:
   maybe we should unify all ak/sk name from different providers.



##########
store/100:
##########


Review Comment:
   Remove this useless file.



##########
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/s3/TieredStorageS3Client.java:
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.s3;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.core.async.AsyncRequestBody;
+import software.amazon.awssdk.core.async.AsyncResponseTransformer;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.Delete;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectResponse;
+import software.amazon.awssdk.services.s3.model.UploadPartCopyRequest;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+public class TieredStorageS3Client {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+    private volatile static TieredStorageS3Client instance;
+
+    private final String region;
+
+    private final String bucket;
+
+    private final TieredMessageStoreConfig tieredMessageStoreConfig;
+
+    private final ExecutorService asyncRequestBodyExecutor;
+
+    private S3AsyncClient client;
+
+    public static TieredStorageS3Client getInstance(TieredMessageStoreConfig 
config) {
+        if (config == null) {
+            return instance;
+        }
+        if (instance == null) {
+            synchronized (TieredStorageS3Client.class) {
+                if (instance == null) {
+                    instance = new TieredStorageS3Client(config, true);
+                }
+            }
+        }
+        return instance;
+    }
+
+    @VisibleForTesting
+    public TieredStorageS3Client(TieredMessageStoreConfig config) {
+        this(config, false);
+    }
+
+    private TieredStorageS3Client(TieredMessageStoreConfig config, boolean 
createClient) {
+        this.tieredMessageStoreConfig = config;
+        this.region = config.getS3Region();
+        this.bucket = config.getS3Bucket();
+        if (createClient) {
+            AwsBasicCredentials basicCredentials = 
AwsBasicCredentials.create(this.tieredMessageStoreConfig.getS3AccessKey(), 
this.tieredMessageStoreConfig.getS3SecretKey());
+            this.client = S3AsyncClient.builder().credentialsProvider(() -> 
basicCredentials).region(Region.of(config.getS3Region())).build();
+        }
+        this.asyncRequestBodyExecutor = Executors.newSingleThreadExecutor(new 
ThreadFactoryImpl("S3AsyncRequestBodyExecutor_"));
+    }
+
+
+    public CompletableFuture<Boolean> writeChunk(String key, InputStream 
inputStream, long length) {
+        PutObjectRequest putObjectRequest = 
PutObjectRequest.builder().bucket(this.bucket).key(key).build();
+        AsyncRequestBody requestBody = 
AsyncRequestBody.fromInputStream(inputStream, length, 
this.asyncRequestBodyExecutor);
+        CompletableFuture<PutObjectResponse> 
putObjectResponseCompletableFuture = this.client.putObject(putObjectRequest, 
requestBody);
+        CompletableFuture<Boolean> completableFuture = new 
CompletableFuture<>();
+        putObjectResponseCompletableFuture.whenComplete((putObjectResponse, 
throwable) -> {
+            if (throwable != null) {
+                LOGGER.error("Upload file to S3 failed, key: {}, region: {}, 
bucket: {}", key, this.region, this.bucket, throwable);
+                completableFuture.complete(false);
+            } else {
+                completableFuture.complete(true);
+            }
+        });
+        return completableFuture;
+    }
+
+    public CompletableFuture<List<ChunkMetadata>> listChunks(String prefix) {
+        CompletableFuture<List<ChunkMetadata>> completableFuture = new 
CompletableFuture<>();
+        CompletableFuture<ListObjectsV2Response> listFuture = 
this.client.listObjectsV2(builder -> 
builder.bucket(this.bucket).prefix(prefix));
+        listFuture.whenComplete((listObjectsV2Response, throwable) -> {
+            if (throwable != null) {
+                LOGGER.error("List objects from S3 failed, prefix: {}, region: 
{}, bucket: {}", prefix, this.region, this.bucket, throwable);
+                completableFuture.complete(Collections.emptyList());
+            } else {
+                listObjectsV2Response.contents().forEach(s3Object -> {
+                    LOGGER.info("List objects from S3, key: {}, region: {}, 
bucket: {}", s3Object.key(), this.region, this.bucket);
+                });
+                
completableFuture.complete(listObjectsV2Response.contents().stream().map(obj -> 
{
+                    ChunkMetadata chunkMetadata = new ChunkMetadata();
+                    String key = obj.key();
+                    chunkMetadata.setChunkName(key);
+                    chunkMetadata.setChunkSize(obj.size().intValue());
+                    String[] paths = key.split("/");
+                    String chunkSubName = paths[paths.length - 1];
+                    Integer startPosition = 
Integer.valueOf(chunkSubName.split("-")[1]);
+                    chunkMetadata.setStartPosition(startPosition);
+                    return chunkMetadata;
+                }).sorted(new Comparator<ChunkMetadata>() {
+                    @Override
+                    public int compare(ChunkMetadata o1, ChunkMetadata o2) {
+                        return (int) (o1.getStartPosition() - 
o2.getStartPosition());
+                    }
+                }).collect(Collectors.toList()));
+            }
+        });
+        return completableFuture;
+    }
+
+    public CompletableFuture<Boolean> exist(String prefix) {
+        CompletableFuture<ListObjectsV2Response> listFuture = 
this.client.listObjectsV2(builder -> 
builder.bucket(this.bucket).prefix(prefix));
+        return listFuture.thenApply(resp -> {
+            return resp.contents().size() > 0;
+        });
+    }
+
+    public CompletableFuture<Boolean> deleteObject(String key) {
+        CompletableFuture<Boolean> future = new CompletableFuture<>();
+        this.client.deleteObject(builder -> 
builder.bucket(this.bucket).key(key)).whenComplete((deleteObjectResponse, 
throwable) -> {
+            if (throwable != null) {
+                LOGGER.error("Delete object from S3 failed, key: {}, region: 
{}, bucket: {}", key, this.region, this.bucket, throwable);
+                future.complete(false);
+            } else {
+                LOGGER.info("Delete object from S3, key: {}, region: {}, 
bucket: {}", key, this.region, this.bucket);
+                future.complete(true);
+            }
+        });
+        return future;
+    }
+
+    public CompletableFuture<List<String/*undeleted keys*/>> 
deleteObjets(final List<String> keys) {
+        if (keys == null || keys.isEmpty()) {
+            return CompletableFuture.completedFuture(Collections.emptyList());
+        }
+        List<ObjectIdentifier> objects = keys.stream().map(key -> 
ObjectIdentifier.builder().key(key).build()).collect(Collectors.toList());
+        Delete delete = Delete.builder().objects(objects).build();
+        DeleteObjectsRequest deleteObjectsRequest = 
DeleteObjectsRequest.builder().bucket(this.bucket).delete(delete).build();
+        return this.client.deleteObjects(deleteObjectsRequest).thenApply(resp 
-> {
+            List<String> undeletedKeys = null;
+            if (resp.deleted().size() != keys.size()) {
+                List<String> deleted = 
resp.deleted().stream().map(deletedObject -> 
deletedObject.key()).collect(Collectors.toList());
+                undeletedKeys = keys.stream().filter(key -> 
!deleted.contains(key)).collect(Collectors.toList());
+            } else {
+                undeletedKeys = Collections.emptyList();
+            }
+            return undeletedKeys;
+        }).exceptionally(throwable -> {
+            LOGGER.error("Delete objects from S3 failed, keys: {}, region: {}, 
bucket: {}", keys, this.region, this.bucket, throwable);
+            return keys;
+        });
+    }
+
+    public CompletableFuture<List<String>> deleteObjects(String prefix) {
+        CompletableFuture<List<String>> readObjectsByPrefix = 
this.client.listObjectsV2(builder -> 
builder.bucket(this.bucket).prefix(prefix)).thenApply(resp -> {
+            return resp.contents().stream().map(s3Object -> 
s3Object.key()).collect(Collectors.toList());
+        });
+        return readObjectsByPrefix.thenCompose(keys -> {
+            return this.deleteObjets(keys);

Review Comment:
   Many lambda functions in this file could be simplified, please follow the 
suggestions from idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to