This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 4c51c49b88e4a408d5cb61baf568c7b707235e42 Author: Wail Alkowaileet <[email protected]> AuthorDate: Fri Jul 19 16:55:42 2024 -0700 [ASTERIXDB-3465][STO] Introduce cloud request limiter - user model changes: no - storage format changes: no - interface changes: yes Details: Introduce request limiter to prevent exceeding the cloud requests limit of cloud providers. Ext-ref: MB-62795 Change-Id: I874969f739e023203bfaabccb227b7555b912ccd Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18505 Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Tested-by: Michael Blow <[email protected]> --- .../api/cluster_state_1/cluster_state_1.1.regexadm | 3 + .../cluster_state_1_full.1.regexadm | 3 + .../cluster_state_1_less.1.regexadm | 3 + .../apache/asterix/cloud/clients/ICloudClient.java | 6 +- .../asterix/cloud/clients/UnstableCloudClient.java | 6 +- .../cloud/clients/aws/s3/S3BufferedWriter.java | 6 +- .../cloud/clients/aws/s3/S3ClientConfig.java | 29 ++++++++- .../cloud/clients/aws/s3/S3CloudClient.java | 15 +++-- .../cloud/clients/aws/s3/S3ParallelDownloader.java | 6 +- .../cloud/clients/aws/s3/S3RequestRateLimiter.java | 58 +++++++++++++++++ .../cloud/clients/google/gcs/GCSCloudClient.java | 45 ++++++------- .../clients/google/gcs/GCSParallelDownloader.java | 8 +-- .../cloud/clients/google/gcs/GCSWriter.java | 6 +- ...filer.java => CountRequestProfilerLimiter.java} | 14 +++- ...tProfiler.java => IRequestProfilerLimiter.java} | 2 +- ...ofiler.java => NoOpRequestProfilerLimiter.java} | 6 +- ...ofiler.java => RequestLimiterNoOpProfiler.java} | 23 ++++--- .../IRateLimiter.java} | 39 +++-------- .../IRequestRateLimiter.java} | 50 ++++++-------- .../NoOpRateLimiter.java} | 37 +++-------- .../NoOpRequestLimiter.java} | 43 +++++------- .../profiler/limiter/TokenBasedRateLimiter.java | 76 ++++++++++++++++++++++ .../asterix/common/config/CloudProperties.java | 26 ++++++++ 23 files changed, 331 insertions(+), 179 deletions(-) diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm index 805f785601..ad6bfe4ee3 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm @@ -9,8 +9,11 @@ "active\.stop\.timeout" : 3600, "active\.suspend\.timeout" : 3600, "azure.request.timeout" : 120, + "cloud.acquire.token.timeout" : 100, "cloud.deployment" : false, "cloud.eviction.plan.reevaluate.threshold" : 50, + "cloud.max.read.requests.per.second" : 4000, + "cloud.max.write.requests.per.second" : 2500, "cloud.profiler.log.interval" : 5, "cloud.storage.allocation.percentage" : 0.8, "cloud.storage.anonymous.auth" : false, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm index 35a488af6b..434c135e7d 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm @@ -9,8 +9,11 @@ "active\.stop\.timeout" : 3600, "active\.suspend\.timeout" : 3600, "azure.request.timeout" : 120, + "cloud.acquire.token.timeout" : 100, "cloud.deployment" : false, "cloud.eviction.plan.reevaluate.threshold" : 50, + "cloud.max.read.requests.per.second" : 4000, + "cloud.max.write.requests.per.second" : 2500, "cloud.profiler.log.interval" : 5, "cloud.storage.allocation.percentage" : 0.8, "cloud.storage.anonymous.auth" : false, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm index 13e7a4e13d..296ac47c36 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm @@ -9,8 +9,11 @@ "active\.stop\.timeout" : 3600, "active\.suspend\.timeout" : 3600, "azure.request.timeout" : 120, + "cloud.acquire.token.timeout" : 100, "cloud.deployment" : false, "cloud.eviction.plan.reevaluate.threshold" : 50, + "cloud.max.read.requests.per.second" : 4000, + "cloud.max.write.requests.per.second" : 2500, "cloud.profiler.log.interval" : 5, "cloud.storage.allocation.percentage" : 0.8, "cloud.storage.anonymous.auth" : false, diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java index 209245b61e..b2087141be 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java @@ -25,7 +25,7 @@ import java.util.Collection; import java.util.Set; import org.apache.asterix.cloud.IWriteBufferProvider; -import org.apache.asterix.cloud.clients.profiler.IRequestProfiler; +import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.control.nc.io.IOManager; @@ -43,9 +43,9 @@ public interface ICloudClient { int getWriteBufferSize(); /** - * @return the requests profiler + * @return the requests profiler-limiter */ - IRequestProfiler getProfiler(); + IRequestProfilerLimiter getProfilerLimiter(); /** * Creates a cloud buffered writer diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java index 2ec5fb5157..4e1c0f7e12 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java @@ -29,7 +29,7 @@ import java.util.Set; import org.apache.asterix.cloud.CloudResettableInputStream; import org.apache.asterix.cloud.IWriteBufferProvider; import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient; -import org.apache.asterix.cloud.clients.profiler.IRequestProfiler; +import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.control.nc.io.IOManager; @@ -53,8 +53,8 @@ public class UnstableCloudClient implements ICloudClient { } @Override - public IRequestProfiler getProfiler() { - return cloudClient.getProfiler(); + public IRequestProfilerLimiter getProfilerLimiter() { + return cloudClient.getProfilerLimiter(); } @Override diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java index 940a0a6db6..53d6546919 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java @@ -25,7 +25,7 @@ import java.util.List; import org.apache.asterix.cloud.clients.ICloudBufferedWriter; import org.apache.asterix.cloud.clients.ICloudGuardian; -import org.apache.asterix.cloud.clients.profiler.IRequestProfiler; +import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil; import org.apache.logging.log4j.LogManager; @@ -47,7 +47,7 @@ public class S3BufferedWriter implements ICloudBufferedWriter { private static final Logger LOGGER = LogManager.getLogger(); private final S3Client s3Client; - private final IRequestProfiler profiler; + private final IRequestProfilerLimiter profiler; private final ICloudGuardian guardian; private final String bucket; private final String path; @@ -56,7 +56,7 @@ public class S3BufferedWriter implements ICloudBufferedWriter { private String uploadId; private int partNumber; - public S3BufferedWriter(S3Client s3client, IRequestProfiler profiler, ICloudGuardian guardian, String bucket, + public S3BufferedWriter(S3Client s3client, IRequestProfilerLimiter profiler, ICloudGuardian guardian, String bucket, String path) { this.s3Client = s3client; this.profiler = profiler; diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java index fe73a0add2..0b9b15c26e 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java @@ -38,21 +38,35 @@ public final class S3ClientConfig { private final boolean anonymousAuth; private final long profilerLogInterval; private final int writeBufferSize; + private final long tokenAcquireTimeout; + private final int readMaxRequestsPerSeconds; + private final int writeMaxRequestsPerSeconds; public S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth, long profilerLogInterval, int writeBufferSize) { + this(region, endpoint, prefix, anonymousAuth, profilerLogInterval, writeBufferSize, 1, 0, 0); + } + + private S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth, + long profilerLogInterval, int writeBufferSize, long tokenAcquireTimeout, int writeMaxRequestsPerSeconds, + int readMaxRequestsPerSeconds) { this.region = Objects.requireNonNull(region, "region"); this.endpoint = endpoint; this.prefix = Objects.requireNonNull(prefix, "prefix"); this.anonymousAuth = anonymousAuth; this.profilerLogInterval = profilerLogInterval; this.writeBufferSize = writeBufferSize; + this.tokenAcquireTimeout = tokenAcquireTimeout; + this.writeMaxRequestsPerSeconds = writeMaxRequestsPerSeconds; + this.readMaxRequestsPerSeconds = readMaxRequestsPerSeconds; } public static S3ClientConfig of(CloudProperties cloudProperties) { return new S3ClientConfig(cloudProperties.getStorageRegion(), cloudProperties.getStorageEndpoint(), cloudProperties.getStoragePrefix(), cloudProperties.isStorageAnonymousAuth(), - cloudProperties.getProfilerLogInterval(), cloudProperties.getWriteBufferSize()); + cloudProperties.getProfilerLogInterval(), cloudProperties.getWriteBufferSize(), + cloudProperties.getTokenAcquireTimeout(), cloudProperties.getWriteMaxRequestsPerSecond(), + cloudProperties.getReadMaxRequestsPerSecond()); } public static S3ClientConfig of(Map<String, String> configuration, int writeBufferSize) { @@ -98,7 +112,20 @@ public final class S3ClientConfig { return writeBufferSize; } + public long getTokenAcquireTimeout() { + return tokenAcquireTimeout; + } + + public int getWriteMaxRequestsPerSeconds() { + return writeMaxRequestsPerSeconds; + } + + public int getReadMaxRequestsPerSeconds() { + return readMaxRequestsPerSeconds; + } + private boolean isS3Mock() { return endpoint != null && !endpoint.isEmpty(); } + } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java index a46b61f645..24d5fa9a3d 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java @@ -43,9 +43,9 @@ import org.apache.asterix.cloud.clients.ICloudClient; import org.apache.asterix.cloud.clients.ICloudGuardian; import org.apache.asterix.cloud.clients.ICloudWriter; import org.apache.asterix.cloud.clients.IParallelDownloader; -import org.apache.asterix.cloud.clients.profiler.CountRequestProfiler; -import org.apache.asterix.cloud.clients.profiler.IRequestProfiler; -import org.apache.asterix.cloud.clients.profiler.NoOpRequestProfiler; +import org.apache.asterix.cloud.clients.profiler.CountRequestProfilerLimiter; +import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter; +import org.apache.asterix.cloud.clients.profiler.RequestLimiterNoOpProfiler; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.util.IoUtil; @@ -78,7 +78,7 @@ public final class S3CloudClient implements ICloudClient { private final S3ClientConfig config; private final S3Client s3Client; private final ICloudGuardian guardian; - private final IRequestProfiler profiler; + private final IRequestProfilerLimiter profiler; private final int writeBufferSize; public S3CloudClient(S3ClientConfig config, ICloudGuardian guardian) { @@ -91,10 +91,11 @@ public final class S3CloudClient implements ICloudClient { this.guardian = guardian; this.writeBufferSize = config.getWriteBufferSize(); long profilerInterval = config.getProfilerLogInterval(); + S3RequestRateLimiter limiter = new S3RequestRateLimiter(config); if (profilerInterval > 0) { - profiler = new CountRequestProfiler(profilerInterval); + profiler = new CountRequestProfilerLimiter(profilerInterval, limiter); } else { - profiler = NoOpRequestProfiler.INSTANCE; + profiler = new RequestLimiterNoOpProfiler(limiter); } guardian.setCloudClient(this); } @@ -105,7 +106,7 @@ public final class S3CloudClient implements ICloudClient { } @Override - public IRequestProfiler getProfiler() { + public IRequestProfilerLimiter getProfilerLimiter() { return profiler; } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java index 005fa5a1a2..2eb9f0908d 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java @@ -30,7 +30,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.apache.asterix.cloud.clients.IParallelDownloader; -import org.apache.asterix.cloud.clients.profiler.IRequestProfiler; +import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter; import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; @@ -58,9 +58,9 @@ class S3ParallelDownloader implements IParallelDownloader { private final S3AsyncClient s3AsyncClient; private final S3TransferManager transferManager; private final S3ClientConfig config; - private final IRequestProfiler profiler; + private final IRequestProfilerLimiter profiler; - S3ParallelDownloader(String bucket, IOManager ioManager, S3ClientConfig config, IRequestProfiler profiler) { + S3ParallelDownloader(String bucket, IOManager ioManager, S3ClientConfig config, IRequestProfilerLimiter profiler) { this.bucket = bucket; this.ioManager = ioManager; this.config = config; diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3RequestRateLimiter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3RequestRateLimiter.java new file mode 100644 index 0000000000..37387a664a --- /dev/null +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3RequestRateLimiter.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.cloud.clients.aws.s3; + +import org.apache.asterix.cloud.clients.profiler.limiter.IRateLimiter; +import org.apache.asterix.cloud.clients.profiler.limiter.IRequestRateLimiter; +import org.apache.asterix.cloud.clients.profiler.limiter.NoOpRateLimiter; +import org.apache.asterix.cloud.clients.profiler.limiter.TokenBasedRateLimiter; + +public final class S3RequestRateLimiter implements IRequestRateLimiter { + private final IRateLimiter writeLimiter; + private final IRateLimiter readLimiter; + + public S3RequestRateLimiter(S3ClientConfig config) { + long tokenAcquireTimeout = config.getTokenAcquireTimeout(); + this.writeLimiter = createLimiter(config.getWriteMaxRequestsPerSeconds(), tokenAcquireTimeout); + this.readLimiter = createLimiter(config.getReadMaxRequestsPerSeconds(), tokenAcquireTimeout); + } + + @Override + public void writeRequest() { + writeLimiter.acquire(); + } + + @Override + public void readRequest() { + readLimiter.acquire(); + } + + @Override + public void listRequest() { + // List requests in S3 are considered as PUT + writeLimiter.acquire(); + } + + private static IRateLimiter createLimiter(int maxRequestsPerSecond, long tokeAcquireTimeout) { + if (maxRequestsPerSecond > 0) { + return new TokenBasedRateLimiter(maxRequestsPerSecond, tokeAcquireTimeout); + } + return NoOpRateLimiter.INSTANCE; + } +} diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java index 2874b4e61a..de242bd51d 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java @@ -38,9 +38,10 @@ import org.apache.asterix.cloud.clients.ICloudClient; import org.apache.asterix.cloud.clients.ICloudGuardian; import org.apache.asterix.cloud.clients.ICloudWriter; import org.apache.asterix.cloud.clients.IParallelDownloader; -import org.apache.asterix.cloud.clients.profiler.CountRequestProfiler; -import org.apache.asterix.cloud.clients.profiler.IRequestProfiler; -import org.apache.asterix.cloud.clients.profiler.NoOpRequestProfiler; +import org.apache.asterix.cloud.clients.profiler.CountRequestProfilerLimiter; +import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter; +import org.apache.asterix.cloud.clients.profiler.NoOpRequestProfilerLimiter; +import org.apache.asterix.cloud.clients.profiler.limiter.NoOpRequestLimiter; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.util.IoUtil; @@ -66,7 +67,7 @@ public class GCSCloudClient implements ICloudClient { private final Storage gcsClient; private final GCSClientConfig config; private final ICloudGuardian guardian; - private final IRequestProfiler profiler; + private final IRequestProfilerLimiter profilerLimiter; public GCSCloudClient(GCSClientConfig config, Storage gcsClient, ICloudGuardian guardian) { this.gcsClient = gcsClient; @@ -74,9 +75,9 @@ public class GCSCloudClient implements ICloudClient { this.guardian = guardian; long profilerInterval = config.getProfilerLogInterval(); if (profilerInterval > 0) { - profiler = new CountRequestProfiler(profilerInterval); + profilerLimiter = new CountRequestProfilerLimiter(profilerInterval, NoOpRequestLimiter.INSTANCE); } else { - profiler = NoOpRequestProfiler.INSTANCE; + profilerLimiter = NoOpRequestProfilerLimiter.INSTANCE; } guardian.setCloudClient(this); } @@ -91,19 +92,19 @@ public class GCSCloudClient implements ICloudClient { } @Override - public IRequestProfiler getProfiler() { - return profiler; + public IRequestProfilerLimiter getProfilerLimiter() { + return profilerLimiter; } @Override public ICloudWriter createWriter(String bucket, String path, IWriteBufferProvider bufferProvider) { - return new GCSWriter(bucket, path, gcsClient, profiler); + return new GCSWriter(bucket, path, gcsClient, profilerLimiter); } @Override public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) { guardian.checkReadAccess(bucket, path); - profiler.objectsList(); + profilerLimiter.objectsList(); Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(path), BlobListOption.fields(Storage.BlobField.SIZE)); @@ -118,7 +119,7 @@ public class GCSCloudClient implements ICloudClient { @Override public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException { - profiler.objectGet(); + profilerLimiter.objectGet(); BlobId blobId = BlobId.of(bucket, path); long readTo = offset + buffer.remaining(); int totalRead = 0; @@ -139,7 +140,7 @@ public class GCSCloudClient implements ICloudClient { @Override public byte[] readAllBytes(String bucket, String path) { - profiler.objectGet(); + profilerLimiter.objectGet(); BlobId blobId = BlobId.of(bucket, path); try { return gcsClient.readAllBytes(blobId); @@ -150,7 +151,7 @@ public class GCSCloudClient implements ICloudClient { @Override public InputStream getObjectStream(String bucket, String path, long offset, long length) { - profiler.objectGet(); + profilerLimiter.objectGet(); try (ReadChannel reader = gcsClient.reader(bucket, path).limit(offset + length)) { reader.seek(offset); return Channels.newInputStream(reader); @@ -162,7 +163,7 @@ public class GCSCloudClient implements ICloudClient { @Override public void write(String bucket, String path, byte[] data) { guardian.checkWriteAccess(bucket, path); - profiler.objectWrite(); + profilerLimiter.objectWrite(); BlobInfo blobInfo = BlobInfo.newBuilder(bucket, path).build(); gcsClient.create(blobInfo, data); } @@ -170,9 +171,9 @@ public class GCSCloudClient implements ICloudClient { @Override public void copy(String bucket, String srcPath, FileReference destPath) { Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(srcPath)); - profiler.objectsList(); + profilerLimiter.objectsList(); for (Blob blob : blobs.iterateAll()) { - profiler.objectCopy(); + profilerLimiter.objectCopy(); BlobId source = blob.getBlobId(); String targetName = destPath.getChildPath(IoUtil.getFileNameFromPath(source.getName())); BlobId target = BlobId.of(bucket, targetName); @@ -199,14 +200,14 @@ public class GCSCloudClient implements ICloudClient { } batchRequest.submit(); - profiler.objectDelete(); + profilerLimiter.objectDelete(); } } @Override public long getObjectSize(String bucket, String path) { guardian.checkReadAccess(bucket, path); - profiler.objectGet(); + profilerLimiter.objectGet(); Blob blob = gcsClient.get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.SIZE)); if (blob == null) { return 0; @@ -217,7 +218,7 @@ public class GCSCloudClient implements ICloudClient { @Override public boolean exists(String bucket, String path) { guardian.checkReadAccess(bucket, path); - profiler.objectGet(); + profilerLimiter.objectGet(); Blob blob = gcsClient.get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.values())); return blob != null && blob.exists(); } @@ -225,7 +226,7 @@ public class GCSCloudClient implements ICloudClient { @Override public boolean isEmptyPrefix(String bucket, String path) { guardian.checkReadAccess(bucket, path); - profiler.objectsList(); + profilerLimiter.objectsList(); Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(path)); return !blobs.hasNextPage(); } @@ -233,13 +234,13 @@ public class GCSCloudClient implements ICloudClient { @Override public IParallelDownloader createParallelDownloader(String bucket, IOManager ioManager) throws HyracksDataException { - return new GCSParallelDownloader(bucket, ioManager, config, profiler); + return new GCSParallelDownloader(bucket, ioManager, config, profilerLimiter); } @Override public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) { guardian.checkReadAccess(bucket, "/"); - profiler.objectsList(); + profilerLimiter.objectsList(); Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.fields(Storage.BlobField.SIZE)); ArrayNode objectsInfo = objectMapper.createArrayNode(); diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java index 281a85593a..0994cea2c7 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java @@ -29,7 +29,7 @@ import java.util.Map; import java.util.Set; import org.apache.asterix.cloud.clients.IParallelDownloader; -import org.apache.asterix.cloud.clients.profiler.IRequestProfiler; +import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter; import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; @@ -55,10 +55,10 @@ public class GCSParallelDownloader implements IParallelDownloader { private final IOManager ioManager; private final Storage gcsClient; private final TransferManager transferManager; - private final IRequestProfiler profiler; + private final IRequestProfilerLimiter profiler; - public GCSParallelDownloader(String bucket, IOManager ioManager, GCSClientConfig config, IRequestProfiler profiler) - throws HyracksDataException { + public GCSParallelDownloader(String bucket, IOManager ioManager, GCSClientConfig config, + IRequestProfilerLimiter profiler) throws HyracksDataException { this.bucket = bucket; this.ioManager = ioManager; this.profiler = profiler; diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java index d9119a58f7..41d1a71ddf 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java @@ -24,7 +24,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.asterix.cloud.clients.ICloudWriter; -import org.apache.asterix.cloud.clients.profiler.IRequestProfiler; +import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -38,12 +38,12 @@ public class GCSWriter implements ICloudWriter { private static final Logger LOGGER = LogManager.getLogger(); private final String bucket; private final String path; - private final IRequestProfiler profiler; + private final IRequestProfilerLimiter profiler; private final Storage gcsClient; private WriteChannel writer = null; private long writtenBytes; - public GCSWriter(String bucket, String path, Storage gcsClient, IRequestProfiler profiler) { + public GCSWriter(String bucket, String path, Storage gcsClient, IRequestProfilerLimiter profiler) { this.bucket = bucket; this.path = path; this.profiler = profiler; diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfilerLimiter.java similarity index 89% rename from asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java rename to asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfilerLimiter.java index 3fc378cc17..16ffb7a18e 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfilerLimiter.java @@ -20,6 +20,7 @@ package org.apache.asterix.cloud.clients.profiler; import java.util.concurrent.atomic.AtomicLong; +import org.apache.asterix.cloud.clients.profiler.limiter.IRequestRateLimiter; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -27,11 +28,12 @@ import org.apache.logging.log4j.Logger; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -public class CountRequestProfiler implements IRequestProfiler { +public class CountRequestProfilerLimiter implements IRequestProfilerLimiter { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final Logger LOGGER = LogManager.getLogger(); private static final Level LOG_LEVEL = Level.TRACE; private final long logInterval; + private final IRequestRateLimiter limiter; private final AtomicLong listObjectsCounter; private final AtomicLong getObjectCounter; private final AtomicLong writeObjectCounter; @@ -41,8 +43,9 @@ public class CountRequestProfiler implements IRequestProfiler { private final AtomicLong multipartDownloadCounter; private long lastLogTimestamp; - public CountRequestProfiler(long logIntervalNanoSec) { + public CountRequestProfilerLimiter(long logIntervalNanoSec, IRequestRateLimiter limiter) { this.logInterval = logIntervalNanoSec; + this.limiter = limiter; listObjectsCounter = new AtomicLong(); getObjectCounter = new AtomicLong(); writeObjectCounter = new AtomicLong(); @@ -55,42 +58,49 @@ public class CountRequestProfiler implements IRequestProfiler { @Override public void objectsList() { + limiter.listRequest(); listObjectsCounter.incrementAndGet(); log(); } @Override public void objectGet() { + limiter.readRequest(); getObjectCounter.incrementAndGet(); log(); } @Override public void objectWrite() { + limiter.writeRequest(); writeObjectCounter.incrementAndGet(); log(); } @Override public void objectDelete() { + limiter.writeRequest(); deleteObjectCounter.incrementAndGet(); log(); } @Override public void objectCopy() { + limiter.writeRequest(); copyObjectCounter.incrementAndGet(); log(); } @Override public void objectMultipartUpload() { + limiter.writeRequest(); multipartUploadCounter.incrementAndGet(); log(); } @Override public void objectMultipartDownload() { + limiter.readRequest(); multipartDownloadCounter.incrementAndGet(); log(); } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfilerLimiter.java similarity index 96% copy from asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java copy to asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfilerLimiter.java index a23a78f41f..b86cd485d2 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfilerLimiter.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.cloud.clients.profiler; -public interface IRequestProfiler { +public interface IRequestProfilerLimiter { void objectsList(); void objectGet(); diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfiler.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfilerLimiter.java similarity index 90% copy from asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfiler.java copy to asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfilerLimiter.java index f7e89c3643..ab658f50ac 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfiler.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfilerLimiter.java @@ -18,10 +18,10 @@ */ package org.apache.asterix.cloud.clients.profiler; -public class NoOpRequestProfiler implements IRequestProfiler { - public static final IRequestProfiler INSTANCE = new NoOpRequestProfiler(); +public class NoOpRequestProfilerLimiter implements IRequestProfilerLimiter { + public static final IRequestProfilerLimiter INSTANCE = new NoOpRequestProfilerLimiter(); - private NoOpRequestProfiler() { + private NoOpRequestProfilerLimiter() { } @Override diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfiler.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/RequestLimiterNoOpProfiler.java similarity index 77% rename from asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfiler.java rename to asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/RequestLimiterNoOpProfiler.java index f7e89c3643..cce2f8e0f7 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfiler.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/RequestLimiterNoOpProfiler.java @@ -18,45 +18,48 @@ */ package org.apache.asterix.cloud.clients.profiler; -public class NoOpRequestProfiler implements IRequestProfiler { - public static final IRequestProfiler INSTANCE = new NoOpRequestProfiler(); +import org.apache.asterix.cloud.clients.profiler.limiter.IRequestRateLimiter; - private NoOpRequestProfiler() { +public final class RequestLimiterNoOpProfiler implements IRequestProfilerLimiter { + private final IRequestRateLimiter limiter; + + public RequestLimiterNoOpProfiler(IRequestRateLimiter limiter) { + this.limiter = limiter; } @Override public void objectsList() { - // NoOp + limiter.listRequest(); } @Override public void objectGet() { - // NoOp + limiter.readRequest(); } @Override public void objectWrite() { - // NoOp + limiter.writeRequest(); } @Override public void objectDelete() { - // NoOp + limiter.writeRequest(); } @Override public void objectCopy() { - // NoOp + limiter.writeRequest(); } @Override public void objectMultipartUpload() { - // NoOp + limiter.writeRequest(); } @Override public void objectMultipartDownload() { - // NoOp + limiter.writeRequest(); } @Override diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/IRateLimiter.java similarity index 61% copy from asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java copy to asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/IRateLimiter.java index a23a78f41f..741735bb21 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/IRateLimiter.java @@ -16,35 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.cloud.clients.profiler; - -public interface IRequestProfiler { - void objectsList(); - - void objectGet(); - - void objectWrite(); - - void objectDelete(); - - void objectCopy(); - - void objectMultipartUpload(); - - void objectMultipartDownload(); - - long objectsListCount(); - - long objectGetCount(); - - long objectWriteCount(); - - long objectDeleteCount(); - - long objectCopyCount(); - - long objectMultipartUploadCount(); - - long objectMultipartDownloadCount(); +package org.apache.asterix.cloud.clients.profiler.limiter; +/** + * Rate limiter + */ +public interface IRateLimiter { + /** + * Acquire permit or wait if rate limit exceeded + */ + void acquire(); } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/IRequestRateLimiter.java similarity index 61% copy from asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java copy to asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/IRequestRateLimiter.java index a23a78f41f..98b2eab558 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/IRequestRateLimiter.java @@ -16,35 +16,25 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.cloud.clients.profiler; - -public interface IRequestProfiler { - void objectsList(); - - void objectGet(); - - void objectWrite(); - - void objectDelete(); - - void objectCopy(); - - void objectMultipartUpload(); - - void objectMultipartDownload(); - - long objectsListCount(); - - long objectGetCount(); - - long objectWriteCount(); - - long objectDeleteCount(); - - long objectCopyCount(); - - long objectMultipartUploadCount(); - - long objectMultipartDownloadCount(); +package org.apache.asterix.cloud.clients.profiler.limiter; +/** + * Rate limiter for Cloud request. If the number of requests per seconds exceeds the provided limit, then + * the requester threads will be throttled. + */ +public interface IRequestRateLimiter { + /** + * Perform a write request + */ + void writeRequest(); + + /** + * Perform a read request + */ + void readRequest(); + + /** + * Perform a list request + */ + void listRequest(); } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/NoOpRateLimiter.java similarity index 61% copy from asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java copy to asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/NoOpRateLimiter.java index a23a78f41f..4dd8c88997 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/NoOpRateLimiter.java @@ -16,35 +16,16 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.cloud.clients.profiler; +package org.apache.asterix.cloud.clients.profiler.limiter; -public interface IRequestProfiler { - void objectsList(); +public class NoOpRateLimiter implements IRateLimiter { + public static final NoOpRateLimiter INSTANCE = new NoOpRateLimiter(); - void objectGet(); - - void objectWrite(); - - void objectDelete(); - - void objectCopy(); - - void objectMultipartUpload(); - - void objectMultipartDownload(); - - long objectsListCount(); - - long objectGetCount(); - - long objectWriteCount(); - - long objectDeleteCount(); - - long objectCopyCount(); - - long objectMultipartUploadCount(); - - long objectMultipartDownloadCount(); + private NoOpRateLimiter() { + } + @Override + public void acquire() { + // NoOp + } } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/NoOpRequestLimiter.java similarity index 61% rename from asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java rename to asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/NoOpRequestLimiter.java index a23a78f41f..ea89a2e68d 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/NoOpRequestLimiter.java @@ -16,35 +16,26 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.cloud.clients.profiler; +package org.apache.asterix.cloud.clients.profiler.limiter; -public interface IRequestProfiler { - void objectsList(); +public final class NoOpRequestLimiter implements IRequestRateLimiter { + public static final IRequestRateLimiter INSTANCE = new NoOpRequestLimiter(); - void objectGet(); + private NoOpRequestLimiter() { + } - void objectWrite(); + @Override + public void writeRequest() { + // NoOp + } - void objectDelete(); - - void objectCopy(); - - void objectMultipartUpload(); - - void objectMultipartDownload(); - - long objectsListCount(); - - long objectGetCount(); - - long objectWriteCount(); - - long objectDeleteCount(); - - long objectCopyCount(); - - long objectMultipartUploadCount(); - - long objectMultipartDownloadCount(); + @Override + public void readRequest() { + // NoOp + } + @Override + public void listRequest() { + // NoOp + } } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/TokenBasedRateLimiter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/TokenBasedRateLimiter.java new file mode 100644 index 0000000000..a0273fb523 --- /dev/null +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/TokenBasedRateLimiter.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.cloud.clients.profiler.limiter; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public final class TokenBasedRateLimiter implements IRateLimiter { + private static final Logger LOGGER = LogManager.getLogger(); + private static final long SECOND_NANO = TimeUnit.SECONDS.toNanos(1); + private final long acquireTimeoutNano; + private final int maxTokensPerSecond; + private final Semaphore semaphore; + private final AtomicLong lastRefillTime; + + /** + * Token-based request limiter + * + * @param maxRequestsPerSecond maximum number of requests per seconds + * @param acquireTimeoutMillis timeout to refill and retry acquiring a token + */ + public TokenBasedRateLimiter(int maxRequestsPerSecond, long acquireTimeoutMillis) { + this.maxTokensPerSecond = maxRequestsPerSecond; + this.acquireTimeoutNano = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis); + this.semaphore = new Semaphore(maxRequestsPerSecond); + this.lastRefillTime = new AtomicLong(System.nanoTime()); + } + + @Override + public void acquire() { + while (true) { + refillTokens(); + try { + if (semaphore.tryAcquire(acquireTimeoutNano, TimeUnit.NANOSECONDS)) { + return; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.debug("Interrupted while waiting for acquiring a request token", e); + return; + } + } + } + + private void refillTokens() { + long refillTime = lastRefillTime.get(); + long now = System.nanoTime(); + long elapsedTime = now - refillTime; + if (elapsedTime > SECOND_NANO && lastRefillTime.compareAndSet(refillTime, now)) { + int delta = maxTokensPerSecond - semaphore.availablePermits(); + if (delta > 0) { + semaphore.release(delta); + } + } + } +} diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java index a4d3b7ab56..5e8d22d165 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java @@ -58,6 +58,9 @@ public class CloudProperties extends AbstractProperties { CLOUD_STORAGE_DEBUG_MODE_ENABLED(BOOLEAN, false), CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE(LONG_BYTE_UNIT, StorageUtil.getLongSizeInBytes(1, GIGABYTE)), CLOUD_PROFILER_LOG_INTERVAL(NONNEGATIVE_INTEGER, 5), + CLOUD_ACQUIRE_TOKEN_TIMEOUT(POSITIVE_INTEGER, 100), + CLOUD_MAX_WRITE_REQUESTS_PER_SECOND(NONNEGATIVE_INTEGER, 2500), + CLOUD_MAX_READ_REQUESTS_PER_SECOND(NONNEGATIVE_INTEGER, 4000), CLOUD_WRITE_BUFFER_SIZE( getRangedIntegerType(5, Integer.MAX_VALUE), StorageUtil.getIntSizeInBytes(8, StorageUtil.StorageUnit.MEGABYTE)), @@ -88,6 +91,9 @@ public class CloudProperties extends AbstractProperties { case CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE: case CLOUD_STORAGE_DEBUG_MODE_ENABLED: case CLOUD_PROFILER_LOG_INTERVAL: + case CLOUD_ACQUIRE_TOKEN_TIMEOUT: + case CLOUD_MAX_WRITE_REQUESTS_PER_SECOND: + case CLOUD_MAX_READ_REQUESTS_PER_SECOND: case CLOUD_WRITE_BUFFER_SIZE: case CLOUD_EVICTION_PLAN_REEVALUATE_THRESHOLD: return Section.COMMON; @@ -146,6 +152,13 @@ public class CloudProperties extends AbstractProperties { return "The waiting time (in minutes) to log cloud request statistics (default: 0, which means" + " the profiler is disabled by default). The minimum is 1 minute." + " NOTE: Enabling the profiler could perturb the performance of cloud requests"; + case CLOUD_ACQUIRE_TOKEN_TIMEOUT: + return "The waiting time (in milliseconds) if a requesting thread failed to acquire a token if the" + + " rate limit of cloud requests exceeded (default: 100, min: 1, and max: 5000)"; + case CLOUD_MAX_WRITE_REQUESTS_PER_SECOND: + return "The maximum number of write requests per second (default: 2500, 0 means unlimited)"; + case CLOUD_MAX_READ_REQUESTS_PER_SECOND: + return "The maximum number of read requests per second (default: 4000, 0 means unlimited)"; case CLOUD_WRITE_BUFFER_SIZE: return "The write buffer size in bytes. (default: 8MB, min: 5MB)"; case CLOUD_EVICTION_PLAN_REEVALUATE_THRESHOLD: @@ -225,6 +238,19 @@ public class CloudProperties extends AbstractProperties { return interval == 0 ? 0 : Math.max(interval, TimeUnit.MINUTES.toNanos(1)); } + public long getTokenAcquireTimeout() { + int time = accessor.getInt(Option.CLOUD_PROFILER_LOG_INTERVAL); + return Math.max(time, 5000); + } + + public int getWriteMaxRequestsPerSecond() { + return accessor.getInt(Option.CLOUD_MAX_WRITE_REQUESTS_PER_SECOND); + } + + public int getReadMaxRequestsPerSecond() { + return accessor.getInt(Option.CLOUD_MAX_READ_REQUESTS_PER_SECOND); + } + public int getWriteBufferSize() { return accessor.getInt(Option.CLOUD_WRITE_BUFFER_SIZE); }
