This is an automated email from the ASF dual-hosted git repository.
alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 93b55e0c04 [ASTERIXDB-3423][CONF] Make cloud write buffer size
configurable
93b55e0c04 is described below
commit 93b55e0c04bd6fbc85523383b9e9409e84628dbb
Author: Ali Alsuliman <[email protected]>
AuthorDate: Mon Jun 10 05:55:28 2024 +0300
[ASTERIXDB-3423][CONF] Make cloud write buffer size configurable
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Change the cloud write buffer size to 8MB.
- Enable the cloud profiler by setting the interval to 5 minutes.
- Make the cloud profiler logging level TRACE.
- Reserve the write buffer size from the jobExecutionMemory.
Change-Id: I41955440f0b3525a42e13ed03ff0909fc788e238
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18347
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
Tested-by: Ali Alsuliman <[email protected]>
---
.../apache/asterix/app/nc/NCAppRuntimeContext.java | 2 +-
.../asterix/hyracks/bootstrap/NCApplication.java | 2 +-
.../cloud/clients/aws/s3/S3ClientConfig.java | 15 +++++---
.../cloud/clients/aws/s3/S3CloudClient.java | 4 ++-
.../clients/profiler/CountRequestProfiler.java | 30 +++++++++-------
.../org/apache/asterix/cloud/s3/LSMS3Test.java | 5 ++-
.../asterix/common/config/CloudProperties.java | 10 +++++-
.../asterix/common/config/StorageProperties.java | 40 +++++++++++++++++-----
8 files changed, 77 insertions(+), 31 deletions(-)
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 18e24ab31c..7da3838838 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -697,7 +697,7 @@ public class NCAppRuntimeContext implements
INcApplicationContext {
String schedulerName = storageProperties.getIoScheduler();
int numPartitions = ioManager.getIODevices().size();
- int maxConcurrentFlushes =
storageProperties.geMaxConcurrentFlushes(numPartitions);
+ int maxConcurrentFlushes =
storageProperties.getMaxConcurrentFlushes(numPartitions);
int maxScheduledMerges =
storageProperties.getMaxScheduledMerges(numPartitions);
int maxConcurrentMerges =
storageProperties.getMaxConcurrentMerges(numPartitions);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index d5659cf090..d02dc4fc09 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -338,7 +338,7 @@ public class NCApplication extends BaseNCApplication {
@Override
public NodeCapacity getCapacity() {
StorageProperties storageProperties =
runtimeContext.getStorageProperties();
- final long memorySize =
storageProperties.getJobExecutionMemoryBudget();
+ final long memorySize =
storageProperties.getJobExecutionMemoryBudget(runtimeContext);
int allCores = Runtime.getRuntime().availableProcessors();
return new NodeCapacity(memorySize, allCores);
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index 161fb37fc5..c98e0e8cc4 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -29,7 +29,7 @@ import
software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
public final class S3ClientConfig {
- static final int WRITE_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(5,
StorageUtil.StorageUnit.MEGABYTE);
+
// The maximum number of file that can be deleted (AWS restriction)
static final int DELETE_BATCH_SIZE = 1000;
private final String region;
@@ -37,20 +37,22 @@ public final class S3ClientConfig {
private final String prefix;
private final boolean anonymousAuth;
private final long profilerLogInterval;
+ private final int writeBufferSize;
public S3ClientConfig(String region, String endpoint, String prefix,
boolean anonymousAuth,
- long profilerLogInterval) {
+ long profilerLogInterval, int writeBufferSize) {
this.region = region;
this.endpoint = endpoint;
this.prefix = prefix;
this.anonymousAuth = anonymousAuth;
this.profilerLogInterval = profilerLogInterval;
+ this.writeBufferSize = writeBufferSize;
}
public static S3ClientConfig of(CloudProperties cloudProperties) {
return new S3ClientConfig(cloudProperties.getStorageRegion(),
cloudProperties.getStorageEndpoint(),
cloudProperties.getStoragePrefix(),
cloudProperties.isStorageAnonymousAuth(),
- cloudProperties.getProfilerLogInterval());
+ cloudProperties.getProfilerLogInterval(),
cloudProperties.getWriteBufferSize());
}
public static S3ClientConfig of(Map<String, String> configuration) {
@@ -63,8 +65,9 @@ public final class S3ClientConfig {
String region = "";
String prefix = "";
boolean anonymousAuth = false;
+ int writeBufferSize = StorageUtil.getIntSizeInBytes(5,
StorageUtil.StorageUnit.MEGABYTE);
- return new S3ClientConfig(region, endPoint, prefix, anonymousAuth,
profilerLogInterval);
+ return new S3ClientConfig(region, endPoint, prefix, anonymousAuth,
profilerLogInterval, writeBufferSize);
}
public String getRegion() {
@@ -92,6 +95,10 @@ public final class S3ClientConfig {
return profilerLogInterval;
}
+ public int getWriteBufferSize() {
+ return writeBufferSize;
+ }
+
private boolean isS3Mock() {
return endpoint != null && !endpoint.isEmpty();
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 97169db507..2eae455ba6 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -79,6 +79,7 @@ public final class S3CloudClient implements ICloudClient {
private final S3Client s3Client;
private final ICloudGuardian guardian;
private final IRequestProfiler profiler;
+ private final int writeBufferSize;
public S3CloudClient(S3ClientConfig config, ICloudGuardian guardian) {
this(config, buildClient(config), guardian);
@@ -88,6 +89,7 @@ public final class S3CloudClient implements ICloudClient {
this.config = config;
this.s3Client = s3Client;
this.guardian = guardian;
+ this.writeBufferSize = config.getWriteBufferSize();
long profilerInterval = config.getProfilerLogInterval();
if (profilerInterval > 0) {
profiler = new CountRequestProfiler(profilerInterval);
@@ -99,7 +101,7 @@ public final class S3CloudClient implements ICloudClient {
@Override
public int getWriteBufferSize() {
- return S3ClientConfig.WRITE_BUFFER_SIZE;
+ return writeBufferSize;
}
@Override
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
index d5fc2db169..624395bb3f 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
@@ -20,6 +20,7 @@ package org.apache.asterix.cloud.clients.profiler;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -29,6 +30,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
public class CountRequestProfiler implements IRequestProfiler {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final Logger LOGGER = LogManager.getLogger();
+ private static final Level LOG_LEVEL = Level.TRACE;
private final long logInterval;
private final AtomicLong listObjectsCounter;
private final AtomicLong getObjectCounter;
@@ -94,19 +96,21 @@ public class CountRequestProfiler implements
IRequestProfiler {
}
private void log() {
- long currentTime = System.nanoTime();
- if (currentTime - lastLogTimestamp >= logInterval) {
- // Might log multiple times
- lastLogTimestamp = currentTime;
- ObjectNode countersNode = OBJECT_MAPPER.createObjectNode();
- countersNode.put("listObjectsCounter", listObjectsCounter.get());
- countersNode.put("getObjectCounter", getObjectCounter.get());
- countersNode.put("writeObjectCounter", writeObjectCounter.get());
- countersNode.put("deleteObjectCounter", deleteObjectCounter.get());
- countersNode.put("copyObjectCounter", copyObjectCounter.get());
- countersNode.put("multipartUploadCounter",
multipartUploadCounter.get());
- countersNode.put("multipartDownloadCounter",
multipartDownloadCounter.get());
- LOGGER.debug("Cloud request counters: {}",
countersNode.toString());
+ if (LOGGER.isEnabled(LOG_LEVEL)) {
+ long currentTime = System.nanoTime();
+ if (currentTime - lastLogTimestamp >= logInterval) {
+ // Might log multiple times
+ lastLogTimestamp = currentTime;
+ ObjectNode countersNode = OBJECT_MAPPER.createObjectNode();
+ countersNode.put("listObjectsCounter",
listObjectsCounter.get());
+ countersNode.put("getObjectCounter", getObjectCounter.get());
+ countersNode.put("writeObjectCounter",
writeObjectCounter.get());
+ countersNode.put("deleteObjectCounter",
deleteObjectCounter.get());
+ countersNode.put("copyObjectCounter", copyObjectCounter.get());
+ countersNode.put("multipartUploadCounter",
multipartUploadCounter.get());
+ countersNode.put("multipartDownloadCounter",
multipartDownloadCounter.get());
+ LOGGER.log(LOG_LEVEL, "Cloud request counters: {}",
countersNode.toString());
+ }
}
}
}
diff --git
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
index 01d3422294..fad8081d99 100644
---
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
+++
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
@@ -24,6 +24,7 @@ import org.apache.asterix.cloud.AbstractLSMTest;
import org.apache.asterix.cloud.clients.ICloudGuardian;
import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.hyracks.util.StorageUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -65,7 +66,9 @@ public class LSMS3Test extends AbstractLSMTest {
cleanup();
client.createBucket(CreateBucketRequest.builder().bucket(PLAYGROUND_CONTAINER).build());
LOGGER.info("Client created successfully");
- S3ClientConfig config = new S3ClientConfig(MOCK_SERVER_REGION,
MOCK_SERVER_HOSTNAME, "", true, 0);
+ int writeBufferSize = StorageUtil.getIntSizeInBytes(5,
StorageUtil.StorageUnit.MEGABYTE);
+ S3ClientConfig config =
+ new S3ClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME,
"", true, 0, writeBufferSize);
CLOUD_CLIENT = new S3CloudClient(config,
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
index ea12621cce..5c612dc718 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
@@ -56,7 +56,8 @@ public class CloudProperties extends AbstractProperties {
CLOUD_STORAGE_INDEX_INACTIVE_DURATION_THRESHOLD(POSITIVE_INTEGER, 360),
CLOUD_STORAGE_DEBUG_MODE_ENABLED(BOOLEAN, false),
CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE(LONG_BYTE_UNIT,
StorageUtil.getLongSizeInBytes(1, GIGABYTE)),
- CLOUD_PROFILER_LOG_INTERVAL(NONNEGATIVE_INTEGER, 0);
+ CLOUD_PROFILER_LOG_INTERVAL(NONNEGATIVE_INTEGER, 5),
+ CLOUD_WRITE_BUFFER_SIZE(POSITIVE_INTEGER,
StorageUtil.getIntSizeInBytes(8, StorageUtil.StorageUnit.MEGABYTE));
private final IOptionType interpreter;
private final Object defaultValue;
@@ -83,6 +84,7 @@ public class CloudProperties extends AbstractProperties {
case CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE:
case CLOUD_STORAGE_DEBUG_MODE_ENABLED:
case CLOUD_PROFILER_LOG_INTERVAL:
+ case CLOUD_WRITE_BUFFER_SIZE:
return Section.COMMON;
default:
return Section.NC;
@@ -139,6 +141,8 @@ public class CloudProperties extends AbstractProperties {
return "The waiting time (in minutes) to log cloud request
statistics (default: 0, which means"
+ " the profiler is disabled by default). The
minimum is 1 minute."
+ " NOTE: Enabling the profiler could perturb the
performance of cloud requests";
+ case CLOUD_WRITE_BUFFER_SIZE:
+ return "The write buffer size in bytes. (default: 8MB)";
default:
throw new IllegalStateException("NYI: " + this);
}
@@ -213,4 +217,8 @@ public class CloudProperties extends AbstractProperties {
long interval =
TimeUnit.MINUTES.toNanos(accessor.getInt(Option.CLOUD_PROFILER_LOG_INTERVAL));
return interval == 0 ? 0 : Math.max(interval,
TimeUnit.MINUTES.toNanos(1));
}
+
+ public int getWriteBufferSize() {
+ return accessor.getInt(Option.CLOUD_WRITE_BUFFER_SIZE);
+ }
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
index 6fe0eb89e4..5fbc7c6f30 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
@@ -32,6 +32,7 @@ import static
org.apache.hyracks.util.StorageUtil.StorageUnit.MEGABYTE;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.utils.PartitioningScheme;
import org.apache.hyracks.api.config.IApplicationConfig;
@@ -240,15 +241,17 @@ public class StorageProperties extends AbstractProperties
{
return
accessor.getInt(Option.STORAGE_MEMORYCOMPONENT_MAX_SCHEDULED_FLUSHES);
}
- public long getJobExecutionMemoryBudget() {
- final long jobExecutionMemory = MAX_HEAP_BYTES - getBufferCacheSize()
- getMemoryComponentGlobalBudget();
- if (jobExecutionMemory <= 0) {
- final String msg = String.format(
- "Invalid node memory configuration, more memory budgeted
than available in JVM. Runtime max memory:"
- + " (%d), Buffer cache memory (%d), memory
component global budget (%d)",
- MAX_HEAP_BYTES, getBufferCacheSize(),
getMemoryComponentGlobalBudget());
- throw new IllegalStateException(msg);
+ public long getJobExecutionMemoryBudget(INcApplicationContext
runtimeContext) {
+ long jobExecutionMemory = MAX_HEAP_BYTES - getBufferCacheSize() -
getMemoryComponentGlobalBudget();
+ if (runtimeContext.isCloudDeployment()) {
+ int numPartitions =
runtimeContext.getIoManager().getIODevices().size();
+ int maxConcurrentMerges = getMaxConcurrentMerges(numPartitions);
+ int maxConcurrentFlushes = getMaxConcurrentFlushes(numPartitions);
+ int writeBufferSize =
runtimeContext.getCloudProperties().getWriteBufferSize();
+ jobExecutionMemory -= (long) (maxConcurrentFlushes +
maxConcurrentMerges) * writeBufferSize;
+
}
+ ensureJobExecutionMemory(jobExecutionMemory, runtimeContext);
return jobExecutionMemory;
}
@@ -260,7 +263,7 @@ public class StorageProperties extends AbstractProperties {
return accessor.getString(Option.STORAGE_IO_SCHEDULER);
}
- public int geMaxConcurrentFlushes(int numPartitions) {
+ public int getMaxConcurrentFlushes(int numPartitions) {
int value =
accessor.getInt(Option.STORAGE_MAX_CONCURRENT_FLUSHES_PER_PARTITION);
return value != 0 ? value * numPartitions : Integer.MAX_VALUE;
}
@@ -326,4 +329,23 @@ public class StorageProperties extends AbstractProperties {
public long getStorageMaxComponentSize() {
return accessor.getLong(Option.STORAGE_MAX_COMPONENT_SIZE);
}
+
+ private void ensureJobExecutionMemory(long jobExecutionMemory,
INcApplicationContext runtimeContext) {
+ if (jobExecutionMemory <= 0) {
+ String msg;
+ if (runtimeContext.isCloudDeployment()) {
+ msg = String.format(
+ "Invalid node memory configuration, more memory
budgeted than available in JVM. Runtime max memory:"
+ + " (%d), Buffer cache memory (%d), memory
component global budget (%d), cloud write buffer size (%d)",
+ MAX_HEAP_BYTES, getBufferCacheSize(),
getMemoryComponentGlobalBudget(),
+
runtimeContext.getCloudProperties().getWriteBufferSize());
+ } else {
+ msg = String.format(
+ "Invalid node memory configuration, more memory
budgeted than available in JVM. Runtime max memory:"
+ + " (%d), Buffer cache memory (%d), memory
component global budget (%d)",
+ MAX_HEAP_BYTES, getBufferCacheSize(),
getMemoryComponentGlobalBudget());
+ }
+ throw new IllegalStateException(msg);
+ }
+ }
}