This is an automated email from the ASF dual-hosted git repository.

luochen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ebbcaa6  [ASTERIXDB-2540][STO] Introduce Disk Write Rate Limiter
ebbcaa6 is described below

commit ebbcaa67cdbcf813ccf87b1df864a4428e05a191
Author: luochen <[email protected]>
AuthorDate: Tue Oct 6 09:22:58 2020 -0700

    [ASTERIXDB-2540][STO] Introduce Disk Write Rate Limiter
    
    - user model changes: yes. Add a new storage option:
         storage.write.rate.limit (default 0)
    - storage format changes: no.
    - interface changes: no.
    
    Details:
    - Introduce a disk write rate limiting mechanism to bound the maximum
    disk write bandwidth usage of large merges.
    - Disk write limiting is performed for each partition. No change to the
    storage format.
    
    Change-Id: If3cb3df1b3c3b4bbee1ba9ec8ab67c357873ef44
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/3455
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
---
 .../apache/asterix/app/nc/NCAppRuntimeContext.java | 10 ++++
 .../common/api/IDatasetLifecycleManager.java       | 11 ++++
 .../common/api/IDiskWriteRateLimiterProvider.java  | 24 +++------
 .../asterix/common/api/INcApplicationContext.java  |  5 ++
 .../asterix/common/config/StorageProperties.java   |  9 +++-
 .../common/context/DatasetLifecycleManager.java    | 18 +++++++
 .../asterix/common/context/DatasetResource.java    | 14 +++++
 .../context/DiskWriteRateLimiterProvider.java      | 52 ++++++++++++++++++
 .../LSMIndexPageWriteCallbackFactory.java          |  9 +++-
 .../common/impls/LSMIndexPageWriteCallback.java    | 10 +++-
 .../hyracks/hyracks-storage-common/pom.xml         |  4 ++
 .../common/buffercache/FIFOLocalWriter.java        |  1 +
 .../common/buffercache/IPageWriteCallback.java     |  8 +++
 .../{IPageWriteCallback.java => IRateLimiter.java} | 18 ++-----
 ...PageWriteCallback.java => NoOpRateLimiter.java} | 30 +++++------
 .../common/buffercache/SleepRateLimiter.java       | 62 ++++++++++++++++++++++
 .../lsm/btree/LSMBTreePageWriteCallbackTest.java   | 21 +++++++-
 17 files changed, 254 insertions(+), 52 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index e804d60..a702381 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -33,6 +33,7 @@ import org.apache.asterix.common.api.IConfigValidator;
 import org.apache.asterix.common.api.IConfigValidatorFactory;
 import org.apache.asterix.common.api.ICoordinationService;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.IDiskWriteRateLimiterProvider;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.api.IPropertiesFactory;
 import org.apache.asterix.common.api.IReceptionist;
@@ -49,6 +50,7 @@ import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.context.DiskWriteRateLimiterProvider;
 import org.apache.asterix.common.context.GlobalVirtualBufferCache;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.library.ILibraryManager;
@@ -153,6 +155,7 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
     private IReceptionist receptionist;
     private ICacheManager cacheManager;
     private IConfigValidator configValidator;
+    private IDiskWriteRateLimiterProvider diskWriteRateLimiterProvider;
 
     public NCAppRuntimeContext(INCServiceContext ncServiceContext, 
NCExtensionManager extensionManager,
             IPropertiesFactory propertiesFactory) {
@@ -287,6 +290,8 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
         lccm.register((ILifeCycleComponent) txnSubsystem.getLockManager());
         lccm.register(txnSubsystem.getCheckpointManager());
         lccm.register(libraryManager);
+
+        diskWriteRateLimiterProvider = new DiskWriteRateLimiterProvider();
     }
 
     @Override
@@ -593,4 +598,9 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
         }
         return ioScheduler;
     }
+
+    @Override
+    public IDiskWriteRateLimiterProvider getDiskWriteRateLimiterProvider() {
+        return diskWriteRateLimiterProvider;
+    }
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index f7cbf18..7b737a0 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -32,6 +32,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IResourceLifecycleManager;
+import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
 
 public interface IDatasetLifecycleManager extends 
IResourceLifecycleManager<IIndex> {
     /**
@@ -104,6 +105,16 @@ public interface IDatasetLifecycleManager extends 
IResourceLifecycleManager<IInd
     ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int 
partition, String path);
 
     /**
+     * creates (if necessary) and returns the rate limiter of a dataset.
+     *
+     * @param datasetId
+     * @param partition
+     * @param path
+     * @return
+     */
+    IRateLimiter getRateLimiter(int datasetId, int partition, long 
writeRateLimit);
+
+    /**
      * creates (if necessary) and returns the dataset virtual buffer caches.
      *
      * @param datasetId
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteCallback.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDiskWriteRateLimiterProvider.java
similarity index 65%
copy from 
hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteCallback.java
copy to 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDiskWriteRateLimiterProvider.java
index 3d2bda9..becd655 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteCallback.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDiskWriteRateLimiterProvider.java
@@ -16,25 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.common.buffercache;
+package org.apache.asterix.common.api;
 
+import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.common.IIndexBulkLoader;
-
-public interface IPageWriteCallback {
-    /**
-     * Initialization
-     *
-     * @param bulkLoader
-     */
-    void initialize(IIndexBulkLoader bulkLoader);
-
-    /**
-     * Notify that a page has been written
-     *
-     * @param page
-     * @throws HyracksDataException
-     */
-    void afterWrite(ICachedPage page) throws HyracksDataException;
+import org.apache.hyracks.storage.common.IResource;
+import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
 
+public interface IDiskWriteRateLimiterProvider {
+    IRateLimiter getRateLimiter(INCServiceContext serviceCtx, IResource 
resource) throws HyracksDataException;
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 65b587b..8c82979 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -140,4 +140,9 @@ public interface INcApplicationContext extends 
IApplicationContext {
      * @return the library manager
      */
     ILibraryManager getLibraryManager();
+
+    /**
+     * @return the disk write rate limiter provider
+     */
+    IDiskWriteRateLimiterProvider getDiskWriteRateLimiterProvider();
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
index adf57fd..22393ec 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
@@ -54,7 +54,8 @@ public class StorageProperties extends AbstractProperties {
         STORAGE_LSM_BLOOMFILTER_FALSEPOSITIVERATE(DOUBLE, 0.01d),
         STORAGE_COMPRESSION_BLOCK(STRING, "snappy"),
         STORAGE_DISK_FORCE_BYTES(LONG_BYTE_UNIT, 
StorageUtil.getLongSizeInBytes(16, MEGABYTE)),
-        STORAGE_IO_SCHEDULER(STRING, "greedy");
+        STORAGE_IO_SCHEDULER(STRING, "greedy"),
+        STORAGE_WRITE_RATE_LIMIT(LONG_BYTE_UNIT, 0l);
 
         private final IOptionType interpreter;
         private final Object defaultValue;
@@ -104,6 +105,8 @@ public class StorageProperties extends AbstractProperties {
                     return "The maximum acceptable false positive rate for 
bloom filters associated with LSM indexes";
                 case STORAGE_COMPRESSION_BLOCK:
                     return "The default compression scheme for the storage";
+                case STORAGE_WRITE_RATE_LIMIT:
+                    return "The maximum disk write rate (bytes/s) for each 
storage partition (disabled if the provided value <= 0)";
                 case STORAGE_DISK_FORCE_BYTES:
                     return "The number of bytes before each disk force 
(fsync)";
                 case STORAGE_IO_SCHEDULER:
@@ -209,6 +212,10 @@ public class StorageProperties extends AbstractProperties {
         return SYSTEM_RESERVED_DATASETS;
     }
 
+    public long getWriteRateLimit() {
+        return accessor.getLong(Option.STORAGE_WRITE_RATE_LIMIT);
+    }
+
     public int getDiskForcePages() {
         return (int) (accessor.getLong(Option.STORAGE_DISK_FORCE_BYTES) / 
getBufferCachePageSize());
     }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 5ea79b3..b26220d 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -55,6 +55,8 @@ import 
org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.LocalResource;
+import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
+import org.apache.hyracks.storage.common.buffercache.SleepRateLimiter;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -310,6 +312,16 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
     }
 
     @Override
+    public synchronized IRateLimiter getRateLimiter(int datasetId, int 
partition, long writeRateLimit) {
+        DatasetResource dataset = datasets.get(datasetId);
+        IRateLimiter rateLimiter = dataset.getRateLimiter(partition);
+        if (rateLimiter == null) {
+            rateLimiter = populateRateLimiter(dataset, partition, 
writeRateLimit);
+        }
+        return rateLimiter;
+    }
+
+    @Override
     public synchronized boolean isRegistered(int datasetId) {
         return datasets.containsKey(datasetId);
     }
@@ -324,6 +336,12 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
         dataset.setIdGenerator(partition, idGenerator);
     }
 
+    private IRateLimiter populateRateLimiter(DatasetResource dataset, int 
partition, long writeRateLimit) {
+        IRateLimiter rateLimiter = SleepRateLimiter.create(writeRateLimit);
+        dataset.setRateLimiter(partition, rateLimiter);
+        return rateLimiter;
+    }
+
     private void validateDatasetLifecycleManagerState() throws 
HyracksDataException {
         if (stopped) {
             throw new 
HyracksDataException(DatasetLifecycleManager.class.getSimpleName() + " was 
stopped.");
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index 8844d41..54e1976 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -28,6 +28,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.LocalResource;
+import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
 
 /**
  * A dataset can be in one of two states { EVICTED , LOADED }.
@@ -46,11 +47,13 @@ public class DatasetResource implements 
Comparable<DatasetResource> {
 
     private final Map<Integer, PrimaryIndexOperationTracker> 
datasetPrimaryOpTrackers;
     private final Map<Integer, ILSMComponentIdGenerator> 
datasetComponentIdGenerators;
+    private final Map<Integer, IRateLimiter> datasetRateLimiters;
 
     public DatasetResource(DatasetInfo datasetInfo) {
         this.datasetInfo = datasetInfo;
         this.datasetPrimaryOpTrackers = new HashMap<>();
         this.datasetComponentIdGenerators = new HashMap<>();
+        this.datasetRateLimiters = new HashMap<>();
     }
 
     public boolean isRegistered() {
@@ -124,6 +127,10 @@ public class DatasetResource implements 
Comparable<DatasetResource> {
         return datasetComponentIdGenerators.get(partition);
     }
 
+    public IRateLimiter getRateLimiter(int partition) {
+        return datasetRateLimiters.get(partition);
+    }
+
     public void setPrimaryIndexOperationTracker(int partition, 
PrimaryIndexOperationTracker opTracker) {
         if (datasetPrimaryOpTrackers.containsKey(partition)) {
             throw new IllegalStateException(
@@ -139,6 +146,13 @@ public class DatasetResource implements 
Comparable<DatasetResource> {
         datasetComponentIdGenerators.put(partition, idGenerator);
     }
 
+    public void setRateLimiter(int partition, IRateLimiter rateLimiter) {
+        if (datasetRateLimiters.containsKey(partition)) {
+            throw new IllegalStateException("RateLimiter has already been set 
for partition " + partition);
+        }
+        datasetRateLimiters.put(partition, rateLimiter);
+    }
+
     @Override
     public int compareTo(DatasetResource o) {
         return datasetInfo.compareTo(o.datasetInfo);
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DiskWriteRateLimiterProvider.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DiskWriteRateLimiterProvider.java
new file mode 100644
index 0000000..7d07cfa
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DiskWriteRateLimiterProvider.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.context;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.api.IDiskWriteRateLimiterProvider;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.IResource;
+import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
+import org.apache.hyracks.storage.common.buffercache.SleepRateLimiter;
+
+public class DiskWriteRateLimiterProvider implements 
IDiskWriteRateLimiterProvider {
+    // stores the write rate limiter for each NC partition
+    private final Map<Integer, IRateLimiter> limiters = new HashMap<>();
+
+    @Override
+    public synchronized IRateLimiter getRateLimiter(INCServiceContext 
serviceCtx, IResource resource)
+            throws HyracksDataException {
+        int partition = 
StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
+        IRateLimiter limiter = limiters.get(partition);
+        if (limiter == null) {
+            INcApplicationContext appCtx = (INcApplicationContext) 
serviceCtx.getApplicationContext();
+            long writeRateLimit = 
appCtx.getStorageProperties().getWriteRateLimit();
+            limiter = SleepRateLimiter.create(writeRateLimit);
+            limiters.put(partition, limiter);
+        }
+        return limiter;
+    }
+
+}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexPageWriteCallbackFactory.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexPageWriteCallbackFactory.java
index 27f18cf..7452ae6 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexPageWriteCallbackFactory.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexPageWriteCallbackFactory.java
@@ -28,6 +28,7 @@ import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMPageWriteCallbackFactory
 import 
org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexPageWriteCallback;
 import org.apache.hyracks.storage.common.IResource;
 import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
+import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -38,15 +39,21 @@ public class LSMIndexPageWriteCallbackFactory implements 
ILSMPageWriteCallbackFa
 
     protected transient int pagesPerForce;
 
+    protected transient IRateLimiter rateLimiter;
+
+    public LSMIndexPageWriteCallbackFactory() {
+    }
+
     @Override
     public void initialize(INCServiceContext ncCtx, IResource resource) throws 
HyracksDataException {
         INcApplicationContext appCtx = (INcApplicationContext) 
ncCtx.getApplicationContext();
         pagesPerForce = appCtx.getStorageProperties().getDiskForcePages();
+        rateLimiter = 
appCtx.getDiskWriteRateLimiterProvider().getRateLimiter(ncCtx, resource);
     }
 
     @Override
     public IPageWriteCallback createPageWriteCallback() throws 
HyracksDataException {
-        return new LSMIndexPageWriteCallback(pagesPerForce);
+        return new LSMIndexPageWriteCallback(rateLimiter, pagesPerForce);
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexPageWriteCallback.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexPageWriteCallback.java
index 991968f..0ad7033 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexPageWriteCallback.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexPageWriteCallback.java
@@ -22,15 +22,18 @@ import 
org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
+import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
 
 public class LSMIndexPageWriteCallback implements IPageWriteCallback {
 
+    private final IRateLimiter rateLimiter;
     private final int pagesPerForce;
     private IIndexBulkLoader bulkLoader;
     private long totalWrittenPages;
     private int totalForces;
 
-    public LSMIndexPageWriteCallback(int pagesPerForce) {
+    public LSMIndexPageWriteCallback(IRateLimiter rateLimiter, int 
pagesPerForce) {
+        this.rateLimiter = rateLimiter;
         this.pagesPerForce = pagesPerForce;
     }
 
@@ -39,10 +42,15 @@ public class LSMIndexPageWriteCallback implements 
IPageWriteCallback {
         this.bulkLoader = bulkLoader;
     }
 
+    public void beforeWrite(ICachedPage page) throws HyracksDataException {
+        rateLimiter.request(page.getFrameSizeMultiplier());
+    }
+
     @Override
     public void afterWrite(ICachedPage page) throws HyracksDataException {
         totalWrittenPages++;
         if (pagesPerForce > 0 && totalWrittenPages % pagesPerForce == 0) {
+            // perform a force
             bulkLoader.force();
             totalForces++;
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml 
b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
index eb453f6..d0dfbf4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
@@ -71,5 +71,9 @@
       <artifactId>snappy-java</artifactId>
       <version>1.1.7.1</version>
     </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
   </dependencies>
 </project>
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
index 5f8bb4a..e74fe59 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
@@ -40,6 +40,7 @@ public class FIFOLocalWriter implements IFIFOPageWriter {
     public void write(ICachedPage page) {
         CachedPage cPage = (CachedPage) page;
         try {
+            callback.beforeWrite(cPage);
             bufferCache.write(cPage);
             callback.afterWrite(cPage);
         } catch (Exception e) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteCallback.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteCallback.java
index 3d2bda9..f4ea6cf 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteCallback.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteCallback.java
@@ -30,6 +30,14 @@ public interface IPageWriteCallback {
     void initialize(IIndexBulkLoader bulkLoader);
 
     /**
+     * Notify that a page is about to be written
+     *
+     * @param page
+     * @throws HyracksDataException
+     */
+    void beforeWrite(ICachedPage page) throws HyracksDataException;
+
+    /**
      * Notify that a page has been written
      *
      * @param page
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteCallback.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IRateLimiter.java
similarity index 69%
copy from 
hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteCallback.java
copy to 
hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IRateLimiter.java
index 3d2bda9..b7433ae 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteCallback.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IRateLimiter.java
@@ -19,22 +19,10 @@
 package org.apache.hyracks.storage.common.buffercache;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.common.IIndexBulkLoader;
 
-public interface IPageWriteCallback {
-    /**
-     * Initialization
-     *
-     * @param bulkLoader
-     */
-    void initialize(IIndexBulkLoader bulkLoader);
+public interface IRateLimiter {
 
-    /**
-     * Notify that a page has been written
-     *
-     * @param page
-     * @throws HyracksDataException
-     */
-    void afterWrite(ICachedPage page) throws HyracksDataException;
+    void setRate(double ratePerSecond);
 
+    void request(int permits) throws HyracksDataException;
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteCallback.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/NoOpRateLimiter.java
similarity index 69%
copy from 
hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteCallback.java
copy to 
hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/NoOpRateLimiter.java
index 3d2bda9..ac0a1a9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteCallback.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/NoOpRateLimiter.java
@@ -19,22 +19,22 @@
 package org.apache.hyracks.storage.common.buffercache;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.common.IIndexBulkLoader;
 
-public interface IPageWriteCallback {
-    /**
-     * Initialization
-     *
-     * @param bulkLoader
-     */
-    void initialize(IIndexBulkLoader bulkLoader);
+public class NoOpRateLimiter implements IRateLimiter {
 
-    /**
-     * Notify that a page has been written
-     *
-     * @param page
-     * @throws HyracksDataException
-     */
-    void afterWrite(ICachedPage page) throws HyracksDataException;
+    public static final NoOpRateLimiter INSTANCE = new NoOpRateLimiter();
+
+    private NoOpRateLimiter() {
+    }
+
+    @Override
+    public void setRate(double ratePerSecond) {
+        // no op
+    }
+
+    @Override
+    public void request(int tokens) throws HyracksDataException {
+        // no op
+    }
 
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/SleepRateLimiter.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/SleepRateLimiter.java
new file mode 100644
index 0000000..4d0ca92
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/SleepRateLimiter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.buffercache;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+/**
+ * A wrapper of the RateLimiter implementation from {@link RateLimiter}
+ *
+ */
+public class SleepRateLimiter implements IRateLimiter {
+    /**
+     * Defines the maximum storage capacity of the rate limiter, i.e., the 
number of permits it stores.
+     * Maybe make configurable in the future
+     */
+    private static final double MAX_BURST_SECONDS = 1.0;
+
+    private final RateLimiter rateLimiterImpl;
+
+    public static IRateLimiter create(long ratePerSecond) {
+        if (ratePerSecond > 0) {
+            return new SleepRateLimiter(ratePerSecond, MAX_BURST_SECONDS);
+        } else {
+            return NoOpRateLimiter.INSTANCE;
+        }
+    }
+
+    public SleepRateLimiter(long ratePerSecond, double maxBurstSeconds) {
+        rateLimiterImpl = RateLimiter.create(ratePerSecond, (long) 
(maxBurstSeconds * 1000), TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void setRate(double ratePerSecond) {
+        rateLimiterImpl.setRate(ratePerSecond);
+    }
+
+    @Override
+    public void request(int permits) throws HyracksDataException {
+        rateLimiterImpl.acquire(permits);
+    }
+
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreePageWriteCallbackTest.java
 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreePageWriteCallbackTest.java
index c0dc6b0..4e59ab0 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreePageWriteCallbackTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreePageWriteCallbackTest.java
@@ -42,6 +42,8 @@ import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMPageWriteCallbackFactory
 import 
org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexPageWriteCallback;
 import org.apache.hyracks.storage.common.IResource;
 import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
+import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
+import org.apache.hyracks.storage.common.buffercache.SleepRateLimiter;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -55,7 +57,22 @@ public class LSMBTreePageWriteCallbackTest extends 
OrderedIndexTestDriver {
 
     private final int PAGES_PER_FORCE = 16;
 
+    private int pageCounter = 0;
     private LSMIndexPageWriteCallback lastCallback = null;
+    private final IRateLimiter testLimiter = new IRateLimiter() {
+        IRateLimiter limiter = SleepRateLimiter.create(100 * 1000);
+
+        @Override
+        public void setRate(double ratePerSecond) {
+
+        }
+
+        @Override
+        public void request(int permits) throws HyracksDataException {
+            limiter.request(permits);
+            pageCounter++;
+        }
+    };
 
     private final ILSMPageWriteCallbackFactory pageWriteCallbackFactory = new 
ILSMPageWriteCallbackFactory() {
         private static final long serialVersionUID = 1L;
@@ -67,7 +84,7 @@ public class LSMBTreePageWriteCallbackTest extends 
OrderedIndexTestDriver {
 
         @Override
         public IPageWriteCallback createPageWriteCallback() throws 
HyracksDataException {
-            lastCallback = new LSMIndexPageWriteCallback(PAGES_PER_FORCE);
+            lastCallback = new LSMIndexPageWriteCallback(testLimiter, 
PAGES_PER_FORCE);
             return lastCallback;
         }
     };
@@ -134,6 +151,7 @@ public class LSMBTreePageWriteCallbackTest extends 
OrderedIndexTestDriver {
                     ctx.getIndex().activate();
                 }
             }
+            pageCounter = 0;
             ILSMIndexAccessor accessor = (ILSMIndexAccessor) 
ctx.getIndexAccessor();
             ILSMIOOperation mergeOp = accessor.scheduleMerge(((LSMBTree) 
ctx.getIndex()).getDiskComponents());
             mergeOp.addCompleteListener(op -> {
@@ -141,6 +159,7 @@ public class LSMBTreePageWriteCallbackTest extends 
OrderedIndexTestDriver {
                     long numPages = op.getNewComponent().getComponentSize()
                             / 
harness.getDiskBufferCache().getPageSizeWithHeader() - 1;
                     // we skipped the metadata page for simplicity
+                    Assert.assertEquals(numPages, pageCounter);
                     Assert.assertEquals(numPages / PAGES_PER_FORCE, 
lastCallback.getTotalForces());
                 }
             });

Reply via email to