This is an automated email from the ASF dual-hosted git repository.

wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a4160334f8 [NO ISSUE]: Add Guardian to GCSWriter, Request Limits for 
GCS
a4160334f8 is described below

commit a4160334f836637261aa85c16fa7d0ec00ac88ed
Author: Savyasach Reddy <[email protected]>
AuthorDate: Mon Aug 12 16:32:45 2024 +0530

    [NO ISSUE]: Add Guardian to GCSWriter, Request Limits for GCS
    
    Ext-ref: MB-63055
    Change-Id: Id639afcadb1d88b4630e12dde40dbaae94e15f23
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18645
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Wail Alkowaileet <[email protected]>
    Reviewed-by: Wail Alkowaileet <[email protected]>
---
 .../test/cloud_storage/CloudStorageGCSTest.java    |  2 -
 .../src/test/resources/cc-cloud-storage-gcs.conf   |  3 ++
 .../cloud/clients/google/gcs/GCSClientConfig.java  | 54 ++++++++++++++------
 .../cloud/clients/google/gcs/GCSCloudClient.java   | 20 +++++---
 .../clients/google/gcs/GCSRequestRateLimiter.java  | 57 ++++++++++++++++++++++
 .../cloud/clients/google/gcs/GCSWriter.java        | 15 ++++--
 .../cloud/writer/GCSExternalFileWriterFactory.java |  2 +-
 .../org/apache/asterix/cloud/gcs/LSMGCSTest.java   |  5 +-
 8 files changed, 128 insertions(+), 30 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
index 3a0344558d..89a4781955 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
@@ -36,7 +36,6 @@ import org.junit.AfterClass;
 import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.FixMethodOrder;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.MethodSorters;
@@ -55,7 +54,6 @@ import com.google.cloud.storage.StorageOptions;
  */
 @RunWith(Parameterized.class)
 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
-@Ignore
 public class CloudStorageGCSTest {
 
     private static final Logger LOGGER = LogManager.getLogger();
diff --git a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf 
b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf
index 3c883a86c1..004664498e 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf
@@ -70,3 +70,6 @@ cloud.storage.region=us-west2
 cloud.storage.endpoint=http://127.0.0.1:4443
 cloud.storage.anonymous.auth=true
 cloud.storage.cache.policy=selective
+cloud.max.write.requests.per.second=1000
+cloud.max.read.requests.per.second=5000
+cloud.write.buffer.size=5
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
index 4edb7a7175..fe5dd4db8a 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
@@ -25,38 +25,50 @@ import java.util.Map;
 
 import org.apache.asterix.common.config.CloudProperties;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.util.StorageUtil;
 
 import com.google.auth.oauth2.GoogleCredentials;
 import com.google.auth.oauth2.OAuth2Credentials;
 import com.google.cloud.NoCredentials;
 
 public class GCSClientConfig {
-    public static final int WRITE_BUFFER_SIZE = 
StorageUtil.getIntSizeInBytes(1, StorageUtil.StorageUnit.MEGABYTE);
+
     // The maximum number of files that can be deleted (GCS restriction): 
https://cloud.google.com/storage/quotas#json-requests
     static final int DELETE_BATCH_SIZE = 100;
     private final String region;
     private final String endpoint;
-    private final String prefix;
     private final boolean anonymousAuth;
     private final long profilerLogInterval;
-
-    public GCSClientConfig(String region, String endpoint, String prefix, 
boolean anonymousAuth,
-            long profilerLogInterval) {
+    private final long tokenAcquireTimeout;
+    private final int readMaxRequestsPerSeconds;
+    private final int writeMaxRequestsPerSeconds;
+    private final int writeBufferSize;
+
+    private GCSClientConfig(String region, String endpoint, boolean 
anonymousAuth, long profilerLogInterval,
+            long tokenAcquireTimeout, int writeMaxRequestsPerSeconds, int 
readMaxRequestsPerSeconds,
+            int writeBufferSize) {
         this.region = region;
         this.endpoint = endpoint;
-        this.prefix = prefix;
         this.anonymousAuth = anonymousAuth;
         this.profilerLogInterval = profilerLogInterval;
+        this.tokenAcquireTimeout = tokenAcquireTimeout;
+        this.writeMaxRequestsPerSeconds = writeMaxRequestsPerSeconds;
+        this.readMaxRequestsPerSeconds = readMaxRequestsPerSeconds;
+        this.writeBufferSize = writeBufferSize;
+    }
+
+    public GCSClientConfig(String region, String endpoint, boolean 
anonymousAuth, long profilerLogInterval,
+            int writeBufferSize) {
+        this(region, endpoint, anonymousAuth, profilerLogInterval, 1, 0, 0, 
writeBufferSize);
     }
 
     public static GCSClientConfig of(CloudProperties cloudProperties) {
         return new GCSClientConfig(cloudProperties.getStorageRegion(), 
cloudProperties.getStorageEndpoint(),
-                cloudProperties.getStoragePrefix(), 
cloudProperties.isStorageAnonymousAuth(),
-                cloudProperties.getProfilerLogInterval());
+                cloudProperties.isStorageAnonymousAuth(), 
cloudProperties.getProfilerLogInterval(),
+                cloudProperties.getTokenAcquireTimeout(), 
cloudProperties.getWriteMaxRequestsPerSecond(),
+                cloudProperties.getReadMaxRequestsPerSecond(), 
cloudProperties.getWriteBufferSize());
     }
 
-    public static GCSClientConfig of(Map<String, String> configuration) {
+    public static GCSClientConfig of(Map<String, String> configuration, int 
writeBufferSize) {
         String endPoint = configuration.getOrDefault(ENDPOINT_FIELD_NAME, "");
         long profilerLogInterval = 0;
 
@@ -64,7 +76,7 @@ public class GCSClientConfig {
         String prefix = "";
         boolean anonymousAuth = false;
 
-        return new GCSClientConfig(region, endPoint, prefix, anonymousAuth, 
profilerLogInterval);
+        return new GCSClientConfig(region, endPoint, anonymousAuth, 
profilerLogInterval, writeBufferSize);
     }
 
     public String getRegion() {
@@ -75,10 +87,6 @@ public class GCSClientConfig {
         return endpoint;
     }
 
-    public String getPrefix() {
-        return prefix;
-    }
-
     public long getProfilerLogInterval() {
         return profilerLogInterval;
     }
@@ -94,4 +102,20 @@ public class GCSClientConfig {
             throw HyracksDataException.create(e);
         }
     }
+
+    public long getTokenAcquireTimeout() {
+        return tokenAcquireTimeout;
+    }
+
+    public int getWriteMaxRequestsPerSeconds() {
+        return writeMaxRequestsPerSeconds;
+    }
+
+    public int getReadMaxRequestsPerSeconds() {
+        return readMaxRequestsPerSeconds;
+    }
+
+    public int getWriteBufferSize() {
+        return writeBufferSize;
+    }
 }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
index de242bd51d..010a6bb185 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
@@ -40,8 +40,7 @@ import org.apache.asterix.cloud.clients.ICloudWriter;
 import org.apache.asterix.cloud.clients.IParallelDownloader;
 import org.apache.asterix.cloud.clients.profiler.CountRequestProfilerLimiter;
 import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
-import org.apache.asterix.cloud.clients.profiler.NoOpRequestProfilerLimiter;
-import org.apache.asterix.cloud.clients.profiler.limiter.NoOpRequestLimiter;
+import org.apache.asterix.cloud.clients.profiler.RequestLimiterNoOpProfiler;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.util.IoUtil;
@@ -68,16 +67,19 @@ public class GCSCloudClient implements ICloudClient {
     private final GCSClientConfig config;
     private final ICloudGuardian guardian;
     private final IRequestProfilerLimiter profilerLimiter;
+    private final int writeBufferSize;
 
     public GCSCloudClient(GCSClientConfig config, Storage gcsClient, 
ICloudGuardian guardian) {
         this.gcsClient = gcsClient;
         this.config = config;
         this.guardian = guardian;
+        this.writeBufferSize = config.getWriteBufferSize();
         long profilerInterval = config.getProfilerLogInterval();
+        GCSRequestRateLimiter limiter = new GCSRequestRateLimiter(config);
         if (profilerInterval > 0) {
-            profilerLimiter = new 
CountRequestProfilerLimiter(profilerInterval, NoOpRequestLimiter.INSTANCE);
+            profilerLimiter = new 
CountRequestProfilerLimiter(profilerInterval, limiter);
         } else {
-            profilerLimiter = NoOpRequestProfilerLimiter.INSTANCE;
+            profilerLimiter = new RequestLimiterNoOpProfiler(limiter);
         }
         guardian.setCloudClient(this);
     }
@@ -88,7 +90,7 @@ public class GCSCloudClient implements ICloudClient {
 
     @Override
     public int getWriteBufferSize() {
-        return GCSClientConfig.WRITE_BUFFER_SIZE;
+        return writeBufferSize;
     }
 
     @Override
@@ -98,7 +100,7 @@ public class GCSCloudClient implements ICloudClient {
 
     @Override
     public ICloudWriter createWriter(String bucket, String path, 
IWriteBufferProvider bufferProvider) {
-        return new GCSWriter(bucket, path, gcsClient, profilerLimiter);
+        return new GCSWriter(bucket, path, gcsClient, profilerLimiter, 
guardian, writeBufferSize);
     }
 
     @Override
@@ -119,6 +121,7 @@ public class GCSCloudClient implements ICloudClient {
 
     @Override
     public int read(String bucket, String path, long offset, ByteBuffer 
buffer) throws HyracksDataException {
+        guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectGet();
         BlobId blobId = BlobId.of(bucket, path);
         long readTo = offset + buffer.remaining();
@@ -140,6 +143,7 @@ public class GCSCloudClient implements ICloudClient {
 
     @Override
     public byte[] readAllBytes(String bucket, String path) {
+        guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectGet();
         BlobId blobId = BlobId.of(bucket, path);
         try {
@@ -151,6 +155,7 @@ public class GCSCloudClient implements ICloudClient {
 
     @Override
     public InputStream getObjectStream(String bucket, String path, long 
offset, long length) {
+        guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectGet();
         try (ReadChannel reader = gcsClient.reader(bucket, path).limit(offset 
+ length)) {
             reader.seek(offset);
@@ -170,8 +175,9 @@ public class GCSCloudClient implements ICloudClient {
 
     @Override
     public void copy(String bucket, String srcPath, FileReference destPath) {
-        Page<Blob> blobs = gcsClient.list(bucket, 
BlobListOption.prefix(srcPath));
+        guardian.checkReadAccess(bucket, srcPath);
         profilerLimiter.objectsList();
+        Page<Blob> blobs = gcsClient.list(bucket, 
BlobListOption.prefix(srcPath));
         for (Blob blob : blobs.iterateAll()) {
             profilerLimiter.objectCopy();
             BlobId source = blob.getBlobId();
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSRequestRateLimiter.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSRequestRateLimiter.java
new file mode 100644
index 0000000000..71f6b8ccd0
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSRequestRateLimiter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.google.gcs;
+
+import org.apache.asterix.cloud.clients.profiler.limiter.IRateLimiter;
+import org.apache.asterix.cloud.clients.profiler.limiter.IRequestRateLimiter;
+import org.apache.asterix.cloud.clients.profiler.limiter.NoOpRateLimiter;
+import org.apache.asterix.cloud.clients.profiler.limiter.TokenBasedRateLimiter;
+
+public class GCSRequestRateLimiter implements IRequestRateLimiter {
+    private final IRateLimiter writeLimiter;
+    private final IRateLimiter readLimiter;
+
+    public GCSRequestRateLimiter(GCSClientConfig config) {
+        long tokenAcquireTimeout = config.getTokenAcquireTimeout();
+        this.writeLimiter = 
createLimiter(config.getWriteMaxRequestsPerSeconds(), tokenAcquireTimeout);
+        this.readLimiter = 
createLimiter(config.getReadMaxRequestsPerSeconds(), tokenAcquireTimeout);
+    }
+
+    @Override
+    public void writeRequest() {
+        writeLimiter.acquire();
+    }
+
+    @Override
+    public void readRequest() {
+        readLimiter.acquire();
+    }
+
+    @Override
+    public void listRequest() {
+        readLimiter.acquire();
+    }
+
+    private static IRateLimiter createLimiter(int maxRequestsPerSecond, long 
tokeAcquireTimeout) {
+        if (maxRequestsPerSecond > 0) {
+            return new TokenBasedRateLimiter(maxRequestsPerSecond, 
tokeAcquireTimeout);
+        }
+        return NoOpRateLimiter.INSTANCE;
+    }
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
index 41d1a71ddf..8d68f016d7 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
@@ -18,11 +18,10 @@
  */
 package org.apache.asterix.cloud.clients.google.gcs;
 
-import static 
org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig.WRITE_BUFFER_SIZE;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.asterix.cloud.clients.ICloudGuardian;
 import org.apache.asterix.cloud.clients.ICloudWriter;
 import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -40,14 +39,20 @@ public class GCSWriter implements ICloudWriter {
     private final String path;
     private final IRequestProfilerLimiter profiler;
     private final Storage gcsClient;
+    private final ICloudGuardian guardian;
+    private final int writeBufferSize;
+
     private WriteChannel writer = null;
     private long writtenBytes;
 
-    public GCSWriter(String bucket, String path, Storage gcsClient, 
IRequestProfilerLimiter profiler) {
+    public GCSWriter(String bucket, String path, Storage gcsClient, 
IRequestProfilerLimiter profiler,
+            ICloudGuardian guardian, int writeBufferSize) {
         this.bucket = bucket;
         this.path = path;
         this.profiler = profiler;
         this.gcsClient = gcsClient;
+        this.guardian = guardian;
+        this.writeBufferSize = writeBufferSize;
         writtenBytes = 0;
     }
 
@@ -58,6 +63,7 @@ public class GCSWriter implements ICloudWriter {
 
     @Override
     public int write(ByteBuffer page) throws HyracksDataException {
+        guardian.checkIsolatedWriteAccess(bucket, path);
         profiler.objectMultipartUpload();
         setUploadId();
         int written = 0;
@@ -93,6 +99,7 @@ public class GCSWriter implements ICloudWriter {
 
     @Override
     public void finish() throws HyracksDataException {
+        guardian.checkWriteAccess(bucket, path);
         setUploadId();
         profiler.objectMultipartUpload();
         try {
@@ -115,7 +122,7 @@ public class GCSWriter implements ICloudWriter {
     private void setUploadId() {
         if (writer == null) {
             writer = gcsClient.writer(BlobInfo.newBuilder(BlobId.of(bucket, 
path)).build());
-            writer.setChunkSize(WRITE_BUFFER_SIZE);
+            writer.setChunkSize(writeBufferSize);
             writtenBytes = 0;
             log("STARTED");
         }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
index 9e9c003beb..886f20d12d 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
@@ -62,7 +62,7 @@ public final class GCSExternalFileWriterFactory extends 
AbstractCloudExternalFil
 
     @Override
     ICloudClient createCloudClient() throws CompilationException {
-        GCSClientConfig config = GCSClientConfig.of(configuration);
+        GCSClientConfig config = GCSClientConfig.of(configuration, 
writeBufferSize);
         return new GCSCloudClient(config, GCSUtils.buildClient(configuration),
                 ICloudGuardian.NoOpCloudGuardian.INSTANCE);
     }
diff --git 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
index 3c62cce7a8..09cc3f6112 100644
--- 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
+++ 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
@@ -22,6 +22,7 @@ import org.apache.asterix.cloud.AbstractLSMTest;
 import org.apache.asterix.cloud.clients.ICloudGuardian;
 import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
 import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient;
+import org.apache.hyracks.util.StorageUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
@@ -48,7 +49,9 @@ public class LSMGCSTest extends AbstractLSMTest {
         
client.create(BucketInfo.newBuilder(PLAYGROUND_CONTAINER).setStorageClass(StorageClass.STANDARD)
                 .setLocation(MOCK_SERVER_REGION).build());
         LOGGER.info("Client created successfully");
-        GCSClientConfig config = new GCSClientConfig(MOCK_SERVER_REGION, 
MOCK_SERVER_HOSTNAME, "", true, 0);
+        int writeBufferSize = StorageUtil.getIntSizeInBytes(5, 
StorageUtil.StorageUnit.MEGABYTE);
+        GCSClientConfig config =
+                new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, 
true, 0, writeBufferSize);
         CLOUD_CLIENT = new GCSCloudClient(config, 
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
     }
 

Reply via email to