This is an automated email from the ASF dual-hosted git repository.

agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e37072  STORM-3719 add config option for updateBlob interval (#3356)
6e37072 is described below

commit 6e3707242e0e6835085d358bec0abee4f62b1426
Author: agresch <[email protected]>
AuthorDate: Tue Dec 8 10:22:33 2020 -0600

    STORM-3719 add config option for updateBlob interval (#3356)
---
 conf/defaults.yaml                                                 | 1 +
 docs/distcache-blobstore.md                                        | 5 ++++-
 storm-server/src/main/java/org/apache/storm/DaemonConfig.java      | 7 +++++++
 .../src/main/java/org/apache/storm/localizer/AsyncLocalizer.java   | 7 ++++++-
 4 files changed, 18 insertions(+), 2 deletions(-)

diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 8b59c0a..a0580a8 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -139,6 +139,7 @@ supervisor.blobstore.download.thread.count: 5
 supervisor.blobstore.download.max_retries: 3
 supervisor.localizer.cache.target.size.mb: 10240
 supervisor.localizer.cleanup.interval.ms: 30000
+supervisor.localizer.update.blob.interval.secs: 30
 
 nimbus.blobstore.class: "org.apache.storm.blobstore.LocalFsBlobStore"
 nimbus.blobstore.expiration.secs: 600
diff --git a/docs/distcache-blobstore.md b/docs/distcache-blobstore.md
index d078bc9..dfce552 100644
--- a/docs/distcache-blobstore.md
+++ b/docs/distcache-blobstore.md
@@ -290,7 +290,10 @@ appropriate runtime values for this worker. The 
distributed cache target size in
 of the distributed cache contents. It is set to 10240 MB.
 
 supervisor.localizer.cleanup.interval.ms: The distributed cache cleanup 
interval. Controls how often it scans to attempt to 
-cleanup anything over the cache target size. By default it is set to 600000 
milliseconds.
+cleanup anything over the cache target size. By default it is set to 300000 
milliseconds.
+
+supervisor.localizer.update.blob.interval.secs: The distributed cache interval 
for checking for blobs to update. By
+default it is set to 30 seconds.
 
 nimbus.blobstore.class:  Sets the blobstore implementation nimbus uses. It is 
set to "org.apache.storm.blobstore.LocalFsBlobStore"
 
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java 
b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index 4de3cf5..40d9e60 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -763,6 +763,13 @@ public class DaemonConfig implements Validated {
     public static final String SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS 
= "supervisor.localizer.cleanup.interval.ms";
 
     /**
+     * The distributed cache interval for checking for blobs to update.
+     */
+    @IsPositiveNumber
+    @IsInteger
+    public static final String SUPERVISOR_LOCALIZER_UPDATE_BLOB_INTERVAL_SECS 
= "supervisor.localizer.update.blob.interval.secs";
+
+    /**
      * What blobstore download parallelism the supervisor should use.
      */
     @IsPositiveNumber
diff --git 
a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java 
b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index d899a8a..3d923ae 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -100,6 +100,7 @@ public class AsyncLocalizer implements AutoCloseable {
     private final ScheduledExecutorService downloadExecService;
     private final ScheduledExecutorService taskExecService;
     private final long cacheCleanupPeriod;
+    private final long updateBlobPeriod;
     private final StormMetricsRegistry metricsRegistry;
     // cleanup
     @VisibleForTesting
@@ -126,6 +127,9 @@ public class AsyncLocalizer implements AutoCloseable {
         cacheCleanupPeriod = ObjectReader.getInt(conf.get(
             DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 30 * 
1000).longValue();
 
+        updateBlobPeriod = ObjectReader.getInt(conf.get(
+                DaemonConfig.SUPERVISOR_LOCALIZER_UPDATE_BLOB_INTERVAL_SECS), 
30).longValue();
+
         blobDownloadRetries = ObjectReader.getInt(conf.get(
             DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3);
 
@@ -349,7 +353,8 @@ public class AsyncLocalizer implements AutoCloseable {
      * Start any background threads needed.  This includes updating blobs and 
cleaning up unused blobs over the configured size limit.
      */
     public void start() {
-        taskExecService.scheduleWithFixedDelay(this::updateBlobs, 30, 30, 
TimeUnit.SECONDS);
+        LOG.debug("Scheduling updateBlobs every {} seconds", updateBlobPeriod);
+        taskExecService.scheduleWithFixedDelay(this::updateBlobs, 
updateBlobPeriod, updateBlobPeriod, TimeUnit.SECONDS);
         LOG.debug("Scheduling cleanup every {} millis", cacheCleanupPeriod);
         taskExecService.scheduleAtFixedRate(this::cleanup, cacheCleanupPeriod, 
cacheCleanupPeriod, TimeUnit.MILLISECONDS);
     }

Reply via email to