This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 93b55e0c04 [ASTERIXDB-3423][CONF] Make cloud write buffer size 
configurable
93b55e0c04 is described below

commit 93b55e0c04bd6fbc85523383b9e9409e84628dbb
Author: Ali Alsuliman <[email protected]>
AuthorDate: Mon Jun 10 05:55:28 2024 +0300

    [ASTERIXDB-3423][CONF] Make cloud write buffer size configurable
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - Change the cloud write buffer size to 8MB.
    - Enable the cloud profiler by setting the interval to 5 minutes.
    - Make the cloud profiler logging level TRACE.
    - Reserve the write buffer size from the jobExecutionMemory.
    
    Change-Id: I41955440f0b3525a42e13ed03ff0909fc788e238
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18347
    Integration-Tests: Jenkins <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
    Tested-by: Ali Alsuliman <[email protected]>
---
 .../apache/asterix/app/nc/NCAppRuntimeContext.java |  2 +-
 .../asterix/hyracks/bootstrap/NCApplication.java   |  2 +-
 .../cloud/clients/aws/s3/S3ClientConfig.java       | 15 +++++---
 .../cloud/clients/aws/s3/S3CloudClient.java        |  4 ++-
 .../clients/profiler/CountRequestProfiler.java     | 30 +++++++++-------
 .../org/apache/asterix/cloud/s3/LSMS3Test.java     |  5 ++-
 .../asterix/common/config/CloudProperties.java     | 10 +++++-
 .../asterix/common/config/StorageProperties.java   | 40 +++++++++++++++++-----
 8 files changed, 77 insertions(+), 31 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 18e24ab31c..7da3838838 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -697,7 +697,7 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
         String schedulerName = storageProperties.getIoScheduler();
         int numPartitions = ioManager.getIODevices().size();
 
-        int maxConcurrentFlushes = 
storageProperties.geMaxConcurrentFlushes(numPartitions);
+        int maxConcurrentFlushes = 
storageProperties.getMaxConcurrentFlushes(numPartitions);
         int maxScheduledMerges = 
storageProperties.getMaxScheduledMerges(numPartitions);
         int maxConcurrentMerges = 
storageProperties.getMaxConcurrentMerges(numPartitions);
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index d5659cf090..d02dc4fc09 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -338,7 +338,7 @@ public class NCApplication extends BaseNCApplication {
     @Override
     public NodeCapacity getCapacity() {
         StorageProperties storageProperties = 
runtimeContext.getStorageProperties();
-        final long memorySize = 
storageProperties.getJobExecutionMemoryBudget();
+        final long memorySize = 
storageProperties.getJobExecutionMemoryBudget(runtimeContext);
         int allCores = Runtime.getRuntime().availableProcessors();
         return new NodeCapacity(memorySize, allCores);
     }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index 161fb37fc5..c98e0e8cc4 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -29,7 +29,7 @@ import 
software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
 
 public final class S3ClientConfig {
-    static final int WRITE_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(5, 
StorageUtil.StorageUnit.MEGABYTE);
+
     // The maximum number of file that can be deleted (AWS restriction)
     static final int DELETE_BATCH_SIZE = 1000;
     private final String region;
@@ -37,20 +37,22 @@ public final class S3ClientConfig {
     private final String prefix;
     private final boolean anonymousAuth;
     private final long profilerLogInterval;
+    private final int writeBufferSize;
 
     public S3ClientConfig(String region, String endpoint, String prefix, 
boolean anonymousAuth,
-            long profilerLogInterval) {
+            long profilerLogInterval, int writeBufferSize) {
         this.region = region;
         this.endpoint = endpoint;
         this.prefix = prefix;
         this.anonymousAuth = anonymousAuth;
         this.profilerLogInterval = profilerLogInterval;
+        this.writeBufferSize = writeBufferSize;
     }
 
     public static S3ClientConfig of(CloudProperties cloudProperties) {
         return new S3ClientConfig(cloudProperties.getStorageRegion(), 
cloudProperties.getStorageEndpoint(),
                 cloudProperties.getStoragePrefix(), 
cloudProperties.isStorageAnonymousAuth(),
-                cloudProperties.getProfilerLogInterval());
+                cloudProperties.getProfilerLogInterval(), 
cloudProperties.getWriteBufferSize());
     }
 
     public static S3ClientConfig of(Map<String, String> configuration) {
@@ -63,8 +65,9 @@ public final class S3ClientConfig {
         String region = "";
         String prefix = "";
         boolean anonymousAuth = false;
+        int writeBufferSize = StorageUtil.getIntSizeInBytes(5, 
StorageUtil.StorageUnit.MEGABYTE);
 
-        return new S3ClientConfig(region, endPoint, prefix, anonymousAuth, 
profilerLogInterval);
+        return new S3ClientConfig(region, endPoint, prefix, anonymousAuth, 
profilerLogInterval, writeBufferSize);
     }
 
     public String getRegion() {
@@ -92,6 +95,10 @@ public final class S3ClientConfig {
         return profilerLogInterval;
     }
 
+    public int getWriteBufferSize() {
+        return writeBufferSize;
+    }
+
     private boolean isS3Mock() {
         return endpoint != null && !endpoint.isEmpty();
     }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 97169db507..2eae455ba6 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -79,6 +79,7 @@ public final class S3CloudClient implements ICloudClient {
     private final S3Client s3Client;
     private final ICloudGuardian guardian;
     private final IRequestProfiler profiler;
+    private final int writeBufferSize;
 
     public S3CloudClient(S3ClientConfig config, ICloudGuardian guardian) {
         this(config, buildClient(config), guardian);
@@ -88,6 +89,7 @@ public final class S3CloudClient implements ICloudClient {
         this.config = config;
         this.s3Client = s3Client;
         this.guardian = guardian;
+        this.writeBufferSize = config.getWriteBufferSize();
         long profilerInterval = config.getProfilerLogInterval();
         if (profilerInterval > 0) {
             profiler = new CountRequestProfiler(profilerInterval);
@@ -99,7 +101,7 @@ public final class S3CloudClient implements ICloudClient {
 
     @Override
     public int getWriteBufferSize() {
-        return S3ClientConfig.WRITE_BUFFER_SIZE;
+        return writeBufferSize;
     }
 
     @Override
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
index d5fc2db169..624395bb3f 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
@@ -20,6 +20,7 @@ package org.apache.asterix.cloud.clients.profiler;
 
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -29,6 +30,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 public class CountRequestProfiler implements IRequestProfiler {
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final Logger LOGGER = LogManager.getLogger();
+    private static final Level LOG_LEVEL = Level.TRACE;
     private final long logInterval;
     private final AtomicLong listObjectsCounter;
     private final AtomicLong getObjectCounter;
@@ -94,19 +96,21 @@ public class CountRequestProfiler implements 
IRequestProfiler {
     }
 
     private void log() {
-        long currentTime = System.nanoTime();
-        if (currentTime - lastLogTimestamp >= logInterval) {
-            // Might log multiple times
-            lastLogTimestamp = currentTime;
-            ObjectNode countersNode = OBJECT_MAPPER.createObjectNode();
-            countersNode.put("listObjectsCounter", listObjectsCounter.get());
-            countersNode.put("getObjectCounter", getObjectCounter.get());
-            countersNode.put("writeObjectCounter", writeObjectCounter.get());
-            countersNode.put("deleteObjectCounter", deleteObjectCounter.get());
-            countersNode.put("copyObjectCounter", copyObjectCounter.get());
-            countersNode.put("multipartUploadCounter", 
multipartUploadCounter.get());
-            countersNode.put("multipartDownloadCounter", 
multipartDownloadCounter.get());
-            LOGGER.debug("Cloud request counters: {}", 
countersNode.toString());
+        if (LOGGER.isEnabled(LOG_LEVEL)) {
+            long currentTime = System.nanoTime();
+            if (currentTime - lastLogTimestamp >= logInterval) {
+                // Might log multiple times
+                lastLogTimestamp = currentTime;
+                ObjectNode countersNode = OBJECT_MAPPER.createObjectNode();
+                countersNode.put("listObjectsCounter", 
listObjectsCounter.get());
+                countersNode.put("getObjectCounter", getObjectCounter.get());
+                countersNode.put("writeObjectCounter", 
writeObjectCounter.get());
+                countersNode.put("deleteObjectCounter", 
deleteObjectCounter.get());
+                countersNode.put("copyObjectCounter", copyObjectCounter.get());
+                countersNode.put("multipartUploadCounter", 
multipartUploadCounter.get());
+                countersNode.put("multipartDownloadCounter", 
multipartDownloadCounter.get());
+                LOGGER.log(LOG_LEVEL, "Cloud request counters: {}", 
countersNode.toString());
+            }
         }
     }
 }
diff --git 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
index 01d3422294..fad8081d99 100644
--- 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
+++ 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
@@ -24,6 +24,7 @@ import org.apache.asterix.cloud.AbstractLSMTest;
 import org.apache.asterix.cloud.clients.ICloudGuardian;
 import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
 import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.hyracks.util.StorageUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
@@ -65,7 +66,9 @@ public class LSMS3Test extends AbstractLSMTest {
         cleanup();
         
client.createBucket(CreateBucketRequest.builder().bucket(PLAYGROUND_CONTAINER).build());
         LOGGER.info("Client created successfully");
-        S3ClientConfig config = new S3ClientConfig(MOCK_SERVER_REGION, 
MOCK_SERVER_HOSTNAME, "", true, 0);
+        int writeBufferSize = StorageUtil.getIntSizeInBytes(5, 
StorageUtil.StorageUnit.MEGABYTE);
+        S3ClientConfig config =
+                new S3ClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, 
"", true, 0, writeBufferSize);
         CLOUD_CLIENT = new S3CloudClient(config, 
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
     }
 
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
index ea12621cce..5c612dc718 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
@@ -56,7 +56,8 @@ public class CloudProperties extends AbstractProperties {
         CLOUD_STORAGE_INDEX_INACTIVE_DURATION_THRESHOLD(POSITIVE_INTEGER, 360),
         CLOUD_STORAGE_DEBUG_MODE_ENABLED(BOOLEAN, false),
         CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE(LONG_BYTE_UNIT, 
StorageUtil.getLongSizeInBytes(1, GIGABYTE)),
-        CLOUD_PROFILER_LOG_INTERVAL(NONNEGATIVE_INTEGER, 0);
+        CLOUD_PROFILER_LOG_INTERVAL(NONNEGATIVE_INTEGER, 5),
+        CLOUD_WRITE_BUFFER_SIZE(POSITIVE_INTEGER, 
StorageUtil.getIntSizeInBytes(8, StorageUtil.StorageUnit.MEGABYTE));
 
         private final IOptionType interpreter;
         private final Object defaultValue;
@@ -83,6 +84,7 @@ public class CloudProperties extends AbstractProperties {
                 case CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE:
                 case CLOUD_STORAGE_DEBUG_MODE_ENABLED:
                 case CLOUD_PROFILER_LOG_INTERVAL:
+                case CLOUD_WRITE_BUFFER_SIZE:
                     return Section.COMMON;
                 default:
                     return Section.NC;
@@ -139,6 +141,8 @@ public class CloudProperties extends AbstractProperties {
                     return "The waiting time (in minutes) to log cloud request 
statistics (default: 0, which means"
                             + " the profiler is disabled by default). The 
minimum is 1 minute."
                             + " NOTE: Enabling the profiler could perturb the 
performance of cloud requests";
+                case CLOUD_WRITE_BUFFER_SIZE:
+                    return "The write buffer size in bytes. (default: 8MB)";
                 default:
                     throw new IllegalStateException("NYI: " + this);
             }
@@ -213,4 +217,8 @@ public class CloudProperties extends AbstractProperties {
         long interval = 
TimeUnit.MINUTES.toNanos(accessor.getInt(Option.CLOUD_PROFILER_LOG_INTERVAL));
         return interval == 0 ? 0 : Math.max(interval, 
TimeUnit.MINUTES.toNanos(1));
     }
+
+    public int getWriteBufferSize() {
+        return accessor.getInt(Option.CLOUD_WRITE_BUFFER_SIZE);
+    }
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
index 6fe0eb89e4..5fbc7c6f30 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
@@ -32,6 +32,7 @@ import static 
org.apache.hyracks.util.StorageUtil.StorageUnit.MEGABYTE;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
+import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.common.utils.PartitioningScheme;
 import org.apache.hyracks.api.config.IApplicationConfig;
@@ -240,15 +241,17 @@ public class StorageProperties extends AbstractProperties 
{
         return 
accessor.getInt(Option.STORAGE_MEMORYCOMPONENT_MAX_SCHEDULED_FLUSHES);
     }
 
-    public long getJobExecutionMemoryBudget() {
-        final long jobExecutionMemory = MAX_HEAP_BYTES - getBufferCacheSize() 
- getMemoryComponentGlobalBudget();
-        if (jobExecutionMemory <= 0) {
-            final String msg = String.format(
-                    "Invalid node memory configuration, more memory budgeted 
than available in JVM. Runtime max memory:"
-                            + " (%d), Buffer cache memory (%d), memory 
component global budget (%d)",
-                    MAX_HEAP_BYTES, getBufferCacheSize(), 
getMemoryComponentGlobalBudget());
-            throw new IllegalStateException(msg);
+    public long getJobExecutionMemoryBudget(INcApplicationContext 
runtimeContext) {
+        long jobExecutionMemory = MAX_HEAP_BYTES - getBufferCacheSize() - 
getMemoryComponentGlobalBudget();
+        if (runtimeContext.isCloudDeployment()) {
+            int numPartitions = 
runtimeContext.getIoManager().getIODevices().size();
+            int maxConcurrentMerges = getMaxConcurrentMerges(numPartitions);
+            int maxConcurrentFlushes = getMaxConcurrentFlushes(numPartitions);
+            int writeBufferSize = 
runtimeContext.getCloudProperties().getWriteBufferSize();
+            jobExecutionMemory -= (long) (maxConcurrentFlushes + 
maxConcurrentMerges) * writeBufferSize;
+
         }
+        ensureJobExecutionMemory(jobExecutionMemory, runtimeContext);
         return jobExecutionMemory;
     }
 
@@ -260,7 +263,7 @@ public class StorageProperties extends AbstractProperties {
         return accessor.getString(Option.STORAGE_IO_SCHEDULER);
     }
 
-    public int geMaxConcurrentFlushes(int numPartitions) {
+    public int getMaxConcurrentFlushes(int numPartitions) {
         int value = 
accessor.getInt(Option.STORAGE_MAX_CONCURRENT_FLUSHES_PER_PARTITION);
         return value != 0 ? value * numPartitions : Integer.MAX_VALUE;
     }
@@ -326,4 +329,23 @@ public class StorageProperties extends AbstractProperties {
     public long getStorageMaxComponentSize() {
         return accessor.getLong(Option.STORAGE_MAX_COMPONENT_SIZE);
     }
+
+    private void ensureJobExecutionMemory(long jobExecutionMemory, 
INcApplicationContext runtimeContext) {
+        if (jobExecutionMemory <= 0) {
+            String msg;
+            if (runtimeContext.isCloudDeployment()) {
+                msg = String.format(
+                        "Invalid node memory configuration, more memory 
budgeted than available in JVM. Runtime max memory:"
+                                + " (%d), Buffer cache memory (%d), memory 
component global budget (%d), cloud write buffer size (%d)",
+                        MAX_HEAP_BYTES, getBufferCacheSize(), 
getMemoryComponentGlobalBudget(),
+                        
runtimeContext.getCloudProperties().getWriteBufferSize());
+            } else {
+                msg = String.format(
+                        "Invalid node memory configuration, more memory 
budgeted than available in JVM. Runtime max memory:"
+                                + " (%d), Buffer cache memory (%d), memory 
component global budget (%d)",
+                        MAX_HEAP_BYTES, getBufferCacheSize(), 
getMemoryComponentGlobalBudget());
+            }
+            throw new IllegalStateException(msg);
+        }
+    }
 }

Reply via email to