This is an automated email from the ASF dual-hosted git repository.
agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 6e37072 STORM-3719 add config option for updateBlob interval (#3356)
6e37072 is described below
commit 6e3707242e0e6835085d358bec0abee4f62b1426
Author: agresch <[email protected]>
AuthorDate: Tue Dec 8 10:22:33 2020 -0600
STORM-3719 add config option for updateBlob interval (#3356)
---
conf/defaults.yaml | 1 +
docs/distcache-blobstore.md | 5 ++++-
storm-server/src/main/java/org/apache/storm/DaemonConfig.java | 7 +++++++
.../src/main/java/org/apache/storm/localizer/AsyncLocalizer.java | 7 ++++++-
4 files changed, 18 insertions(+), 2 deletions(-)
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 8b59c0a..a0580a8 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -139,6 +139,7 @@ supervisor.blobstore.download.thread.count: 5
supervisor.blobstore.download.max_retries: 3
supervisor.localizer.cache.target.size.mb: 10240
supervisor.localizer.cleanup.interval.ms: 30000
+supervisor.localizer.update.blob.interval.secs: 30
nimbus.blobstore.class: "org.apache.storm.blobstore.LocalFsBlobStore"
nimbus.blobstore.expiration.secs: 600
diff --git a/docs/distcache-blobstore.md b/docs/distcache-blobstore.md
index d078bc9..dfce552 100644
--- a/docs/distcache-blobstore.md
+++ b/docs/distcache-blobstore.md
@@ -290,7 +290,10 @@ appropriate runtime values for this worker. The
distributed cache target size in
of the distributed cache contents. It is set to 10240 MB.
supervisor.localizer.cleanup.interval.ms: The distributed cache cleanup
interval. Controls how often it scans to attempt to
-cleanup anything over the cache target size. By default it is set to 600000
milliseconds.
+cleanup anything over the cache target size. By default it is set to 300000
milliseconds.
+
+supervisor.localizer.update.blob.interval.secs: The distributed cache interval
for checking for blobs to update. By
+default it is set to 30 seconds.
nimbus.blobstore.class: Sets the blobstore implementation nimbus uses. It is
set to "org.apache.storm.blobstore.LocalFsBlobStore"
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index 4de3cf5..40d9e60 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -763,6 +763,13 @@ public class DaemonConfig implements Validated {
public static final String SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS
= "supervisor.localizer.cleanup.interval.ms";
/**
+ * The distributed cache interval for checking for blobs to update.
+ */
+ @IsPositiveNumber
+ @IsInteger
+ public static final String SUPERVISOR_LOCALIZER_UPDATE_BLOB_INTERVAL_SECS
= "supervisor.localizer.update.blob.interval.secs";
+
+ /**
* What blobstore download parallelism the supervisor should use.
*/
@IsPositiveNumber
diff --git
a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index d899a8a..3d923ae 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -100,6 +100,7 @@ public class AsyncLocalizer implements AutoCloseable {
private final ScheduledExecutorService downloadExecService;
private final ScheduledExecutorService taskExecService;
private final long cacheCleanupPeriod;
+ private final long updateBlobPeriod;
private final StormMetricsRegistry metricsRegistry;
// cleanup
@VisibleForTesting
@@ -126,6 +127,9 @@ public class AsyncLocalizer implements AutoCloseable {
cacheCleanupPeriod = ObjectReader.getInt(conf.get(
DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 30 *
1000).longValue();
+ updateBlobPeriod = ObjectReader.getInt(conf.get(
+ DaemonConfig.SUPERVISOR_LOCALIZER_UPDATE_BLOB_INTERVAL_SECS),
30).longValue();
+
blobDownloadRetries = ObjectReader.getInt(conf.get(
DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3);
@@ -349,7 +353,8 @@ public class AsyncLocalizer implements AutoCloseable {
* Start any background threads needed. This includes updating blobs and
cleaning up unused blobs over the configured size limit.
*/
public void start() {
- taskExecService.scheduleWithFixedDelay(this::updateBlobs, 30, 30,
TimeUnit.SECONDS);
+ LOG.debug("Scheduling updateBlobs every {} seconds", updateBlobPeriod);
+ taskExecService.scheduleWithFixedDelay(this::updateBlobs,
updateBlobPeriod, updateBlobPeriod, TimeUnit.SECONDS);
LOG.debug("Scheduling cleanup every {} millis", cacheCleanupPeriod);
taskExecService.scheduleAtFixedRate(this::cleanup, cacheCleanupPeriod,
cacheCleanupPeriod, TimeUnit.MILLISECONDS);
}