KKcorps commented on a change in pull request #5249:
URL: https://github.com/apache/incubator-pinot/pull/5249#discussion_r412442556



##########
File path: 
pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
##########
@@ -0,0 +1,485 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.filesystem;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import 
software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.core.exception.SdkServiceException;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.core.sync.ResponseTransformer;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.internal.resource.S3BucketResource;
+import software.amazon.awssdk.services.s3.model.*;
+
+import java.io.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+import static joptsimple.internal.Strings.isNullOrEmpty;
+import static org.glassfish.jersey.internal.guava.Preconditions.checkArgument;
+
+public class S3PinotFS extends PinotFS {
+    public static final String ACCESS_KEY = "accessKey";
+    public static final String SECRET_KEY = "secretKey";
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(S3PinotFS.class);
+    private static final String DELIMITER = "/";
+    private S3Client s3Client;
+
+    @Override
+    public void init(Configuration config) {
+        checkArgument(!isNullOrEmpty(config.getString(ACCESS_KEY)));
+        checkArgument(!isNullOrEmpty(config.getString(SECRET_KEY)));
+        String accessKey = config.getString(ACCESS_KEY);
+        String secretKey = config.getString(SECRET_KEY);
+        try {
+            AwsBasicCredentials awsBasicCredentials = 
AwsBasicCredentials.create(accessKey, secretKey);
+            s3Client = S3Client.builder()
+                    .region(Region.AP_SOUTHEAST_1)
+                    
.credentialsProvider(StaticCredentialsProvider.create(awsBasicCredentials))
+                    .build();
+
+        } catch (S3Exception e) {
+            throw new RuntimeException("Could not initialize S3PinotFS", e);
+        }
+    }
+
+    private HeadObjectResponse getS3ObjectMetadata(URI uri) throws IOException 
{
+            URI base = getBase(uri);
+            String path = sanitizePath(base.relativize(uri).getPath());
+            HeadObjectRequest headObjectRequest = HeadObjectRequest.builder()
+                    .bucket(uri.getHost())
+                    .key(path)
+                    .build();
+
+            return s3Client.headObject(headObjectRequest);
+    }
+
+    private boolean isPathTerminatedByDelimiter(URI uri) {
+        return uri.getPath().endsWith(DELIMITER);
+    }
+
+    private String normalizeToDirectoryPrefix(URI uri) throws IOException {
+        requireNonNull(uri, "uri is null");
+        URI strippedUri = getBase(uri).relativize(uri);
+        if (isPathTerminatedByDelimiter(strippedUri)) {
+            return sanitizePath(strippedUri.getPath());
+        }
+        return sanitizePath(strippedUri.getPath() + DELIMITER);
+    }
+
+    private URI normalizeToDirectoryUri(URI uri) throws IOException {
+        if (isPathTerminatedByDelimiter(uri)) {
+            return uri;
+        }
+        try {
+            return new URI(uri.getScheme(), uri.getHost(), 
sanitizePath(uri.getPath() + DELIMITER), null);
+        } catch (URISyntaxException e) {
+            throw new IOException(e);
+        }
+    }
+
+    private String sanitizePath(String path) {
+        path = path.replaceAll(DELIMITER + "+", DELIMITER);
+        if (path.startsWith(DELIMITER) && !path.equals(DELIMITER)) {
+            path = path.substring(1);
+        }
+        return path;
+    }
+
+    private URI getBase(URI uri) throws IOException {
+        try {
+            return new URI(uri.getScheme(), uri.getHost(), null, null);
+        } catch (URISyntaxException e) {
+            throw new IOException(e);
+        }
+    }
+
+    private boolean existsFile(URI uri) throws IOException {
+        try {
+            URI base = getBase(uri);
+            String path = sanitizePath(base.relativize(uri).getPath());
+            HeadObjectRequest headObjectRequest = HeadObjectRequest.builder()
+                    .bucket(uri.getHost())
+                    .key(path)
+                    .build();
+
+            s3Client.headObject(headObjectRequest);
+            return true;
+        }catch(NoSuchKeyException e){
+            return false;
+        }catch(S3Exception e){
+            throw new IOException(e);
+        }
+    }
+
+    private boolean isEmptyDirectory(URI uri) throws IOException {
+        if (!isDirectory(uri)) {
+            return false;
+        }
+        String prefix = normalizeToDirectoryPrefix(uri);
+        boolean isEmpty = true;
+        ListObjectsV2Response listObjectsV2Response;
+        ListObjectsV2Request.Builder listObjectsV2RequestBuilder = 
ListObjectsV2Request.builder()
+                .bucket(uri.getHost());
+
+        if (prefix.equals(DELIMITER)) {
+            ListObjectsV2Request listObjectsV2Request = 
listObjectsV2RequestBuilder.build();
+            listObjectsV2Response = 
s3Client.listObjectsV2(listObjectsV2Request);
+        } else {
+            ListObjectsV2Request listObjectsV2Request = 
listObjectsV2RequestBuilder.prefix(prefix).build();
+            listObjectsV2Response = 
s3Client.listObjectsV2(listObjectsV2Request);
+        }
+        for (S3Object s3Object : listObjectsV2Response.contents()) {
+            if (s3Object.key().equals(prefix)) {
+                continue;
+            } else {
+                isEmpty = false;
+                break;
+            }
+        }
+        return isEmpty;
+    }
+
+    private boolean copyFile(URI srcUri, URI dstUri) throws IOException {
+        try {
+            String encodedUrl = null;
+            try {
+                encodedUrl = URLEncoder.encode(srcUri.getHost() + 
srcUri.getPath(), StandardCharsets.UTF_8.toString());
+            } catch (UnsupportedEncodingException e) {
+                LOGGER.info("URL could not be encoded: {}", e.getMessage());
+            }
+
+            String dstPath = sanitizePath(dstUri.getPath());
+            CopyObjectRequest copyReq = CopyObjectRequest.builder()
+                    .copySource(encodedUrl)
+                    .destinationBucket(dstUri.getHost())
+                    .destinationKey(dstPath)
+                    .build();
+
+            CopyObjectResponse copyObjectResponse = 
s3Client.copyObject(copyReq);
+            return copyObjectResponse.sdkHttpResponse().isSuccessful();
+        }catch(S3Exception e){
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public boolean mkdir(URI uri) throws IOException {
+        LOGGER.info("mkdir {}", uri);
+        try {
+            requireNonNull(uri, "uri is null");
+            String path = normalizeToDirectoryPrefix(uri);
+            // Bucket root directory already exists and cannot be created
+            if (path.equals(DELIMITER)) {
+                return true;
+            }
+
+            PutObjectRequest putObjectRequest = PutObjectRequest.builder()
+                    .bucket(uri.getHost())
+                    .key(path)
+                    .build();
+
+            PutObjectResponse putObjectResponse = 
s3Client.putObject(putObjectRequest, RequestBody.fromBytes(new byte[0]));
+
+            return putObjectResponse.sdkHttpResponse().isSuccessful();
+        } catch (Throwable t) {
+            throw new IOException(t);
+        }
+    }
+
+    @Override
+    public boolean delete(URI segmentUri, boolean forceDelete) throws 
IOException {
+        LOGGER.info("Deleting uri {} force {}", segmentUri, forceDelete);
+        try {
+            if (isDirectory(segmentUri)) {
+                if (!forceDelete) {
+                    checkState(isEmptyDirectory(segmentUri), "ForceDelete flag 
is not set and directory '%s' is not empty", segmentUri);
+                }
+                String prefix = normalizeToDirectoryPrefix(segmentUri);
+                ListObjectsV2Response listObjectsV2Response;
+                ListObjectsV2Request.Builder listObjectsV2RequestBuilder = 
ListObjectsV2Request.builder()
+                        .bucket(segmentUri.getHost());
+
+                if (prefix.equals(DELIMITER)) {
+                    ListObjectsV2Request listObjectsV2Request = 
listObjectsV2RequestBuilder.build();
+                    listObjectsV2Response = 
s3Client.listObjectsV2(listObjectsV2Request);
+                } else {
+                    ListObjectsV2Request listObjectsV2Request = 
listObjectsV2RequestBuilder.prefix(prefix).build();
+                    listObjectsV2Response = 
s3Client.listObjectsV2(listObjectsV2Request);

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to