This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 8f1cd017b710c6de4bf5f1b480f8daaeb06de388 Author: luochen <[email protected]> AuthorDate: Thu Dec 17 13:40:55 2020 -0800 [ASTERIXDB-2813] Limit the number of flush/merge threads - user model changes: no - storage format changes: no - interface changes: yes. Details: - Limit the number of flush/merge threads by introducing the following parameters. - storage.max.running.flushes.per.partition: the maximum number of running flushes for each partition. - storage.max.scheduled.merge.per.partition: the maximum number of scheduled merges for each partition. This is mainly used by the greedy scheduler. - storage.max.running.merges.per.partition: the maximum number of running mergese per partition. - Basically, we limit the number of flush/merge threads and put newly created flush/merge opreations into a wait queue if the limit is reached. - For the greedy scheduler, the scheduled merges (i.e., merge threads) are more than the running merges so that the scheduler can pick the smallest merge for each LSM-tree. Change-Id: I85a55423a1438b1d534c2e6a5968e675a99884c8 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/9183 Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Tested-by: Murtadha Hubail <[email protected]> --- .../apache/asterix/app/nc/NCAppRuntimeContext.java | 12 +- .../test/dataflow/LSMFlushRecoveryTest.java | 2 +- .../asterix/common/config/StorageProperties.java | 26 +- .../storage/am/lsm/common/api/ILSMIOOperation.java | 4 + .../api/ILSMIOOperationSchedulerFactory.java | 3 +- .../impls/AbstractAsynchronousScheduler.java | 96 ++++++-- .../am/lsm/common/impls/AbstractIoOperation.java | 5 + .../am/lsm/common/impls/AsynchronousScheduler.java | 37 ++- .../am/lsm/common/impls/GreedyScheduler.java | 118 ++++++--- .../am/lsm/common/impls/IoOperationExecutor.java | 29 +-- .../am/lsm/common/impls/NoOpIoOperation.java | 5 + .../am/lsm/common/impls/TracedIOOperation.java | 5 + .../lsm/btree/LSMBTreeComponentLifecycleTest.java | 3 +- .../storage/am/lsm/btree/perf/LSMTreeRunner.java | 2 +- .../am/lsm/common/test/GreedySchedulerTest.java | 133 ---------- .../am/lsm/common/test/IoSchedulerTest.java | 267 +++++++++++++++++++++ 16 files changed, 522 insertions(+), 225 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index e058d39..a9a3a3e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -582,20 +582,26 @@ public class NCAppRuntimeContext implements INcApplicationContext { private ILSMIOOperationScheduler createIoScheduler(StorageProperties properties) { String schedulerName = storageProperties.getIoScheduler(); + int numPartitions = ioManager.getIODevices().size(); + + int maxRunningFlushes = storageProperties.getMaxRunningFlushes(numPartitions); + int maxScheduledMerges = storageProperties.getMaxScheduledMerges(numPartitions); + int maxRunningMerges = storageProperties.getMaxRunningMerges(numPartitions); + ILSMIOOperationScheduler ioScheduler = null; if (AsynchronousScheduler.FACTORY.getName().equalsIgnoreCase(schedulerName)) { ioScheduler = AsynchronousScheduler.FACTORY.createIoScheduler(getServiceContext().getThreadFactory(), - HaltCallback.INSTANCE); + HaltCallback.INSTANCE, maxRunningFlushes, maxScheduledMerges, maxRunningMerges); } else if (GreedyScheduler.FACTORY.getName().equalsIgnoreCase(schedulerName)) { ioScheduler = GreedyScheduler.FACTORY.createIoScheduler(getServiceContext().getThreadFactory(), - HaltCallback.INSTANCE); + HaltCallback.INSTANCE, maxRunningFlushes, maxScheduledMerges, maxRunningMerges); } else { if (LOGGER.isWarnEnabled()) { LOGGER.log(Level.WARN, "Unknown storage I/O scheduler: " + schedulerName + "; defaulting to greedy I/O scheduler."); } ioScheduler = GreedyScheduler.FACTORY.createIoScheduler(getServiceContext().getThreadFactory(), - HaltCallback.INSTANCE); + HaltCallback.INSTANCE, maxRunningFlushes, maxScheduledMerges, maxRunningMerges); } return ioScheduler; } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java index fe73baf..c3a6839 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java @@ -173,7 +173,7 @@ public class LSMFlushRecoveryTest { public void operationFailed(ILSMIOOperation operation, Throwable t) { LOGGER.warn("IO Operation failed", t); } - })); + }, Integer.MAX_VALUE, Integer.MAX_VALUE)); dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager(); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java index a99a306..fc33b1a 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java @@ -55,7 +55,10 @@ public class StorageProperties extends AbstractProperties { STORAGE_COMPRESSION_BLOCK(STRING, "snappy"), STORAGE_DISK_FORCE_BYTES(LONG_BYTE_UNIT, StorageUtil.getLongSizeInBytes(16, MEGABYTE)), STORAGE_IO_SCHEDULER(STRING, "greedy"), - STORAGE_WRITE_RATE_LIMIT(LONG_BYTE_UNIT, 0l); + STORAGE_WRITE_RATE_LIMIT(LONG_BYTE_UNIT, 0l), + STORAGE_MAX_RUNNING_FLUSHES_PER_PARTITION(NONNEGATIVE_INTEGER, 2), + STORAGE_MAX_SCHEDULED_MERGES_PER_PARTITION(NONNEGATIVE_INTEGER, 8), + STORAGE_MAX_RUNNING_MERGES_PER_PARTITION(NONNEGATIVE_INTEGER, 2); private final IOptionType interpreter; private final Object defaultValue; @@ -111,6 +114,12 @@ public class StorageProperties extends AbstractProperties { return "The number of bytes before each disk force (fsync)"; case STORAGE_IO_SCHEDULER: return "The I/O scheduler for LSM flush and merge operations"; + case STORAGE_MAX_RUNNING_FLUSHES_PER_PARTITION: + return "The maximum number of running flushes per partition (0 means unlimited)"; + case STORAGE_MAX_SCHEDULED_MERGES_PER_PARTITION: + return "The maximum number of scheduled merges per partition (0 means unlimited)"; + case STORAGE_MAX_RUNNING_MERGES_PER_PARTITION: + return "The maximum number of running merges per partition (0 means unlimited)"; default: throw new IllegalStateException("NYI: " + this); } @@ -204,6 +213,21 @@ public class StorageProperties extends AbstractProperties { return accessor.getString(Option.STORAGE_IO_SCHEDULER); } + public int getMaxRunningFlushes(int numPartitions) { + int value = accessor.getInt(Option.STORAGE_MAX_RUNNING_FLUSHES_PER_PARTITION); + return value != 0 ? value * numPartitions : Integer.MAX_VALUE; + } + + public int getMaxScheduledMerges(int numPartitions) { + int value = accessor.getInt(Option.STORAGE_MAX_SCHEDULED_MERGES_PER_PARTITION); + return value != 0 ? value * numPartitions : Integer.MAX_VALUE; + } + + public int getMaxRunningMerges(int numPartitions) { + int value = accessor.getInt(Option.STORAGE_MAX_RUNNING_MERGES_PER_PARTITION); + return value != 0 ? value * numPartitions : Integer.MAX_VALUE; + } + protected int getMetadataDatasets() { return MetadataIndexImmutableProperties.METADATA_DATASETS_COUNT; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java index 753d27a..40998b7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java @@ -177,4 +177,8 @@ public interface ILSMIOOperation extends Callable<LSMIOOperationStatus>, IPageWr */ boolean isActive(); + /** + * @return whether this IO operation is completed + */ + boolean isCompleted(); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationSchedulerFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationSchedulerFactory.java index 1c8a4e1..36bfc5a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationSchedulerFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationSchedulerFactory.java @@ -21,7 +21,8 @@ package org.apache.hyracks.storage.am.lsm.common.api; import java.util.concurrent.ThreadFactory; public interface ILSMIOOperationSchedulerFactory { - ILSMIOOperationScheduler createIoScheduler(ThreadFactory threadFactory, IIoOperationFailedCallback callback); + ILSMIOOperationScheduler createIoScheduler(ThreadFactory threadFactory, IIoOperationFailedCallback callback, + int maxNumRunningFlushes, int maxNumScheduledMerges, int maxNumRunningMerges); String getName(); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java index 78185f0..e266a6f 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus; @@ -34,13 +35,18 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; public abstract class AbstractAsynchronousScheduler implements ILSMIOOperationScheduler, Closeable { protected final ExecutorService executor; + + private final int maxNumFlushes; protected final Map<String, ILSMIOOperation> runningFlushOperations = new HashMap<>(); - protected final Map<String, Deque<ILSMIOOperation>> waitingFlushOperations = new HashMap<>(); + protected final Deque<ILSMIOOperation> waitingFlushOperations = new ArrayDeque<>(); + protected final Deque<ILSMIOOperation> waitingMergeOperations = new ArrayDeque<>(); + protected final Map<String, Throwable> failedGroups = new HashMap<>(); - public AbstractAsynchronousScheduler(ThreadFactory threadFactory, final IIoOperationFailedCallback callback) { - executor = new IoOperationExecutor(threadFactory, this, callback, runningFlushOperations, - waitingFlushOperations, failedGroups); + public AbstractAsynchronousScheduler(ThreadFactory threadFactory, final IIoOperationFailedCallback callback, + int maxNumFlushes) { + executor = new IoOperationExecutor(threadFactory, this, callback, runningFlushOperations, failedGroups); + this.maxNumFlushes = maxNumFlushes; } @Override @@ -61,27 +67,35 @@ public abstract class AbstractAsynchronousScheduler implements ILSMIOOperationSc } } + @Override + public void completeOperation(ILSMIOOperation operation) throws HyracksDataException { + switch (operation.getIOOpertionType()) { + case FLUSH: + completeFlush(operation); + break; + case MERGE: + completeMerge(operation); + case NOOP: + return; + default: + // this should never happen + // just guard here to avoid silent failures in case of future extensions + throw new IllegalArgumentException("Unknown operation type " + operation.getIOOpertionType()); + } + } + protected abstract void scheduleMerge(ILSMIOOperation operation); + protected abstract void completeMerge(ILSMIOOperation operation); + protected void scheduleFlush(ILSMIOOperation operation) { String id = operation.getIndexIdentifier(); synchronized (executor) { - if (failedGroups.containsKey(id)) { - // Group failure. Fail the operation right away - operation.setStatus(LSMIOOperationStatus.FAILURE); - operation.setFailure(new RuntimeException("Operation group " + id + " has permanently failed", - failedGroups.get(id))); - operation.complete(); + if (checkFailedFlush(operation)) { return; } - if (runningFlushOperations.containsKey(id)) { - if (waitingFlushOperations.containsKey(id)) { - waitingFlushOperations.get(id).offer(operation); - } else { - Deque<ILSMIOOperation> q = new ArrayDeque<>(); - q.offer(operation); - waitingFlushOperations.put(id, q); - } + if (runningFlushOperations.size() >= maxNumFlushes || runningFlushOperations.containsKey(id)) { + waitingFlushOperations.add(operation); } else { runningFlushOperations.put(id, operation); executor.submit(operation); @@ -89,6 +103,52 @@ public abstract class AbstractAsynchronousScheduler implements ILSMIOOperationSc } } + private boolean checkFailedFlush(ILSMIOOperation operation) { + String id = operation.getIndexIdentifier(); + if (failedGroups.containsKey(id)) { + // Group failure. Fail the operation right away + operation.setStatus(LSMIOOperationStatus.FAILURE); + operation.setFailure( + new RuntimeException("Operation group " + id + " has permanently failed", failedGroups.get(id))); + operation.complete(); + return true; + } else { + return false; + } + } + + private void completeFlush(ILSMIOOperation operation) { + String id = operation.getIndexIdentifier(); + synchronized (executor) { + runningFlushOperations.remove(id); + + // Schedule flushes in FIFO order. Must make sure that there is at most one scheduled flush for each index. + for (ILSMIOOperation flushOp : waitingFlushOperations) { + String flushOpId = flushOp.getIndexIdentifier(); + if (runningFlushOperations.size() < maxNumFlushes) { + if (!runningFlushOperations.containsKey(flushOpId) && !flushOp.isCompleted() + && !checkFailedFlush(flushOp)) { + runningFlushOperations.put(flushOpId, flushOp); + executor.submit(flushOp); + } + } else { + break; + } + } + + // cleanup scheduled flushes + while (!waitingFlushOperations.isEmpty()) { + ILSMIOOperation top = waitingFlushOperations.peek(); + if (top.isCompleted() || runningFlushOperations.get(top.getIndexIdentifier()) == top) { + waitingFlushOperations.poll(); + } else { + break; + } + } + + } + } + @Override public void close() throws IOException { executor.shutdown(); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java index 0938b5f..8317ca7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java @@ -202,6 +202,11 @@ public abstract class AbstractIoOperation implements ILSMIOOperation { return isActive.get(); } + @Override + public synchronized boolean isCompleted() { + return completed; + } + public void waitIfPaused() throws HyracksDataException { synchronized (this) { while (!isActive.get()) { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java index ac3481c..afd9a49 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java @@ -35,26 +35,49 @@ public class AsynchronousScheduler extends AbstractAsynchronousScheduler { public static final ILSMIOOperationSchedulerFactory FACTORY = new ILSMIOOperationSchedulerFactory() { @Override public ILSMIOOperationScheduler createIoScheduler(ThreadFactory threadFactory, - IIoOperationFailedCallback callback) { - return new AsynchronousScheduler(threadFactory, callback); + IIoOperationFailedCallback callback, int maxNumRunningFlushes, int maxNumScheduledMerges, + int maxNumRunningMerges) { + return new AsynchronousScheduler(threadFactory, callback, maxNumRunningFlushes, maxNumRunningMerges); } + @Override public String getName() { return "async"; } }; - public AsynchronousScheduler(ThreadFactory threadFactory, IIoOperationFailedCallback callback) { - super(threadFactory, callback); + private final int maxNumRunningMerges; + private int numRunningMerges = 0; + + public AsynchronousScheduler(ThreadFactory threadFactory, IIoOperationFailedCallback callback, + int maxNumRunningFlushes, int maxNumRunningMerges) { + super(threadFactory, callback, maxNumRunningFlushes); + this.maxNumRunningMerges = maxNumRunningMerges; } @Override protected void scheduleMerge(ILSMIOOperation operation) { - executor.submit(operation); + synchronized (executor) { + if (numRunningMerges >= maxNumRunningMerges) { + waitingMergeOperations.add(operation); + } else { + doScheduleMerge(operation); + } + } } @Override - public void completeOperation(ILSMIOOperation operation) { - // no op + protected void completeMerge(ILSMIOOperation operation) { + synchronized (executor) { + --numRunningMerges; + if (!waitingMergeOperations.isEmpty() && numRunningMerges < maxNumRunningMerges) { + doScheduleMerge(waitingMergeOperations.poll()); + } + } + } + + private void doScheduleMerge(ILSMIOOperation operation) { + ++numRunningMerges; + executor.submit(operation); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/GreedyScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/GreedyScheduler.java index 742ae24..f3afa43 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/GreedyScheduler.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/GreedyScheduler.java @@ -18,85 +18,141 @@ */ package org.apache.hyracks.storage.am.lsm.common.impls; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.ThreadFactory; import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerFactory; /** - * This is a greedy asynchronous scheduler that always allocates the full bandwidth for the merge operation - * with the smallest required disk bandwidth to minimize the number of disk components. It has been proven - * that if the number of components in all merge operations are the same, then this scheduler is optimal - * by always minimizing the number of disk components over time; if not, this is still a good heuristic + * Under the greedy scheduler, a merge operation has the following lifecycles. When the merge policy submits a + * merge operation to the greedy scheduler, the merge operation is SCHEDULED if the number of scheduled merge + * operations is smaller than maxNumScheduledMergeOperations; otherwise, the merge operation is WAITING and is + * stored into a queue. WAITING merge operations will be scheduled after some existing merge operations finish + * in a FIFO order. + * + * The greedy scheduler always runs at most one (and smallest) merge operation for each LSM-tree. The maximum number of + * running merge operations is controlled by maxNumRunningMergeOperations. A SCHEDULED merge operation can become + * RUNNING if the greedy scheduler resumes this merge operation, and a RUNNING merge operation can become SCHEDULED + * if the greedy scheduler pauses this merge operation. * */ public class GreedyScheduler extends AbstractAsynchronousScheduler { - public static final ILSMIOOperationSchedulerFactory FACTORY = new ILSMIOOperationSchedulerFactory() { + public static ILSMIOOperationSchedulerFactory FACTORY = new ILSMIOOperationSchedulerFactory() { @Override public ILSMIOOperationScheduler createIoScheduler(ThreadFactory threadFactory, - IIoOperationFailedCallback callback) { - return new GreedyScheduler(threadFactory, callback); + IIoOperationFailedCallback callback, int maxNumRunningFlushes, int maxNumScheduledMerges, + int maxNumRunningMerges) { + return new GreedyScheduler(threadFactory, callback, maxNumRunningFlushes, maxNumScheduledMerges, + maxNumRunningMerges); } + @Override public String getName() { return "greedy"; } }; - private final Map<String, List<ILSMIOOperation>> mergeOperations = new HashMap<>(); + private final int maxNumScheduledMerges; + private final int maxNumRunningMerges; + + private int numScheduledMerges; + private final Map<String, Set<ILSMIOOperation>> scheduledMergeOperations = new HashMap<>(); + private final Map<String, ILSMIOOperation> runningMergeOperations = new HashMap<>(); - public GreedyScheduler(ThreadFactory threadFactory, IIoOperationFailedCallback callback) { - super(threadFactory, callback); + public GreedyScheduler(ThreadFactory threadFactory, IIoOperationFailedCallback callback, int maxNumRunningFlushes, + int maxNumScheduledMerges, int maxNumRunningMerges) { + super(threadFactory, callback, maxNumRunningFlushes); + this.maxNumScheduledMerges = maxNumScheduledMerges; + this.maxNumRunningMerges = maxNumRunningMerges; } + @Override protected void scheduleMerge(ILSMIOOperation operation) { operation.pause(); - String id = operation.getIndexIdentifier(); synchronized (executor) { - List<ILSMIOOperation> mergeOpList = mergeOperations.computeIfAbsent(id, key -> new ArrayList<>()); - mergeOpList.add(operation); - dispatchMergeOperation(mergeOpList); + if (numScheduledMerges >= maxNumScheduledMerges) { + waitingMergeOperations.add(operation); + } else { + doScheduleMerge(operation); + } } + } + + private void doScheduleMerge(ILSMIOOperation operation) { + String indexIdentier = operation.getIndexIdentifier(); + Set<ILSMIOOperation> mergeOps = scheduledMergeOperations.computeIfAbsent(indexIdentier, k -> new HashSet<>()); + mergeOps.add(operation); executor.submit(operation); + numScheduledMerges++; + + dispatchMergeOperation(indexIdentier, mergeOps); } - private void dispatchMergeOperation(List<ILSMIOOperation> mergeOps) { - ILSMIOOperation activeOp = null; + private void dispatchMergeOperation(String indexIdentier, Set<ILSMIOOperation> mergeOps) { + if (!runningMergeOperations.containsKey(indexIdentier) + && runningMergeOperations.size() >= maxNumRunningMerges) { + return; + } + ILSMIOOperation runningOp = null; ILSMIOOperation smallestMergeOp = null; for (ILSMIOOperation op : mergeOps) { if (op.isActive()) { - activeOp = op; + runningOp = op; } if (smallestMergeOp == null || op.getRemainingPages() < smallestMergeOp.getRemainingPages()) { smallestMergeOp = op; } } - if (smallestMergeOp != activeOp) { - if (activeOp != null) { - activeOp.pause(); + if (smallestMergeOp != runningOp) { + if (runningOp != null) { + runningOp.pause(); } smallestMergeOp.resume(); + runningMergeOperations.put(indexIdentier, smallestMergeOp); } } @Override - public void completeOperation(ILSMIOOperation op) { - if (op.getIOOpertionType() == LSMIOOperationType.MERGE) { - String id = op.getIndexIdentifier(); - synchronized (executor) { - List<ILSMIOOperation> mergeOpList = mergeOperations.get(id); - mergeOpList.remove(op); - if (!mergeOpList.isEmpty()) { - dispatchMergeOperation(mergeOpList); + protected void completeMerge(ILSMIOOperation op) { + String id = op.getIndexIdentifier(); + synchronized (executor) { + Set<ILSMIOOperation> mergeOperations = scheduledMergeOperations.get(id); + mergeOperations.remove(op); + if (mergeOperations.isEmpty()) { + scheduledMergeOperations.remove(id); + } + runningMergeOperations.remove(id); + numScheduledMerges--; + + if (!waitingMergeOperations.isEmpty() && numScheduledMerges < maxNumScheduledMerges) { + doScheduleMerge(waitingMergeOperations.poll()); + } + if (runningMergeOperations.size() < maxNumRunningMerges) { + String indexWithMostScheduledMerges = findIndexWithMostScheduledMerges(); + if (indexWithMostScheduledMerges != null) { + dispatchMergeOperation(indexWithMostScheduledMerges, + scheduledMergeOperations.get(indexWithMostScheduledMerges)); } } } } + + private String findIndexWithMostScheduledMerges() { + String targetIndex = null; + int maxMerges = 0; + for (Map.Entry<String, Set<ILSMIOOperation>> e : scheduledMergeOperations.entrySet()) { + if (!runningMergeOperations.containsKey(e.getKey()) + && (targetIndex == null || maxMerges < e.getValue().size())) { + targetIndex = e.getKey(); + maxMerges = e.getValue().size(); + } + } + return targetIndex; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IoOperationExecutor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IoOperationExecutor.java index d5354ed..2a48627 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IoOperationExecutor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IoOperationExecutor.java @@ -18,7 +18,6 @@ */ package org.apache.hyracks.storage.am.lsm.common.impls; -import java.util.Deque; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.RunnableFuture; @@ -40,16 +39,14 @@ public class IoOperationExecutor extends ThreadPoolExecutor { private final IIoOperationFailedCallback callback; private final Map<String, ILSMIOOperation> runningFlushOperations; private final Map<String, Throwable> failedGroups; - private final Map<String, Deque<ILSMIOOperation>> waitingFlushOperations; public IoOperationExecutor(ThreadFactory threadFactory, ILSMIOOperationScheduler scheduler, IIoOperationFailedCallback callback, Map<String, ILSMIOOperation> runningFlushOperations, - Map<String, Deque<ILSMIOOperation>> waitingFlushOperations, Map<String, Throwable> failedGroups) { + Map<String, Throwable> failedGroups) { super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), threadFactory); this.scheduler = scheduler; this.callback = callback; this.runningFlushOperations = runningFlushOperations; - this.waitingFlushOperations = waitingFlushOperations; this.failedGroups = failedGroups; } @@ -80,20 +77,6 @@ public class IoOperationExecutor extends ThreadPoolExecutor { executedOp.complete(); // destroy if merge or successful flush } scheduler.completeOperation(executedOp); - if (executedOp.getIOOpertionType() == LSMIOOperationType.FLUSH) { - String id = executedOp.getIndexIdentifier(); - synchronized (this) { - runningFlushOperations.remove(id); - if (waitingFlushOperations.containsKey(id)) { - ILSMIOOperation op = waitingFlushOperations.get(id).poll(); - if (op != null) { - scheduler.scheduleOperation(op); - } else { - waitingFlushOperations.remove(id); - } - } - } - } } private void fail(ILSMIOOperation executedOp, Throwable t) throws HyracksDataException { @@ -106,16 +89,6 @@ public class IoOperationExecutor extends ThreadPoolExecutor { String id = executedOp.getIndexIdentifier(); failedGroups.put(id, t); runningFlushOperations.remove(id); - if (waitingFlushOperations.containsKey(id)) { - Deque<ILSMIOOperation> ops = waitingFlushOperations.remove(id); - ILSMIOOperation next = ops.poll(); - while (next != null) { - next.setFailure(new RuntimeException("Operation group " + id + " has permanently failed", t)); - next.setStatus(LSMIOOperationStatus.FAILURE); - next.complete(); - next = ops.poll(); - } - } } } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java index 7351bdf..036ade2 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java @@ -158,4 +158,9 @@ public class NoOpIoOperation implements ILSMIOOperation { return false; } + @Override + public boolean isCompleted() { + return true; + } + } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java index 8adf5f7..4ab57c5 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java @@ -194,4 +194,9 @@ class TracedIOOperation implements ILSMIOOperation { public boolean isActive() { return ioOp.isActive(); } + + @Override + public boolean isCompleted() { + return ioOp.isCompleted(); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeComponentLifecycleTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeComponentLifecycleTest.java index b4e4d84..7f8fd8a 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeComponentLifecycleTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeComponentLifecycleTest.java @@ -250,7 +250,8 @@ public class LSMBTreeComponentLifecycleTest { public void operationFailed(ILSMIOOperation operation, Throwable failure) { LOGGER.log(Level.ERROR, "Operation {} failed", operation, failure); } - }), new EncapsulatingIoCallbackFactory(harness.getIOOperationCallbackFactory(), NoOpTestCallback.get(), + }, Integer.MAX_VALUE, Integer.MAX_VALUE), + new EncapsulatingIoCallbackFactory(harness.getIOOperationCallbackFactory(), NoOpTestCallback.get(), NoOpTestCallback.get(), new ITestOpCallback<ILSMIOOperation>() { @Override public void before(ILSMIOOperation t) throws HyracksDataException { diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java index 3f36a34..f487bf1 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java @@ -121,7 +121,7 @@ public class LSMTreeRunner implements IExperimentRunner { public void schedulerFailed(ILSMIOOperationScheduler scheduler, Throwable failure) { ExitUtil.exit(ExitUtil.EC_IO_SCHEDULER_FAILED); } - }); + }, Integer.MAX_VALUE, Integer.MAX_VALUE); lsmtree = LSMBTreeUtil.createLSMTree(ioManager, virtualBufferCaches, file, bufferCache, typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, new NoMergePolicy(), diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/GreedySchedulerTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/GreedySchedulerTest.java deleted file mode 100644 index d03f7a5..0000000 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/GreedySchedulerTest.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.storage.am.lsm.common.test; - -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; -import org.apache.hyracks.storage.am.lsm.common.impls.GreedyScheduler; -import org.apache.hyracks.storage.am.lsm.common.impls.NoOpIoOperationFailedCallback; -import org.junit.Assert; -import org.junit.Test; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -public class GreedySchedulerTest { - - private static final String INDEX_1 = "index1"; - private static final String INDEX_2 = "index2"; - - private final Object lock = new Object(); - - @Test - public void test() throws Exception { - GreedyScheduler scheduler = new GreedyScheduler(r -> new Thread(r), NoOpIoOperationFailedCallback.INSTANCE); - AtomicBoolean active1 = new AtomicBoolean(true); - ILSMIOOperation op1 = mockMergeOperation(INDEX_1, 10, active1); - - scheduler.scheduleOperation(op1); - // op1 is activated - Assert.assertTrue(active1.get()); - - AtomicBoolean active2 = new AtomicBoolean(true); - ILSMIOOperation op2 = mockMergeOperation(INDEX_2, 5, active2); - scheduler.scheduleOperation(op2); - // op2 does not interactive with op1s - Assert.assertTrue(active1.get()); - Assert.assertTrue(active2.get()); - - scheduler.completeOperation(op2); - Assert.assertTrue(active1.get()); - - AtomicBoolean active3 = new AtomicBoolean(true); - ILSMIOOperation op3 = mockMergeOperation(INDEX_1, 5, active3); - scheduler.scheduleOperation(op3); - Assert.assertTrue(active3.get()); - Assert.assertFalse(active1.get()); - - AtomicBoolean active4 = new AtomicBoolean(true); - ILSMIOOperation op4 = mockMergeOperation(INDEX_1, 7, active4); - scheduler.scheduleOperation(op4); - // op3 is still active - Assert.assertFalse(active1.get()); - Assert.assertTrue(active3.get()); - Assert.assertFalse(active4.get()); - - // suppose op1 is completed (though unlikely in practice), now op3 is still active - scheduler.completeOperation(op1); - Assert.assertTrue(active3.get()); - Assert.assertFalse(active4.get()); - - // op3 completed, op4 is active - scheduler.completeOperation(op3); - Assert.assertTrue(active4.get()); - - synchronized (lock) { - lock.notifyAll(); - } - scheduler.close(); - } - - private ILSMIOOperation mockMergeOperation(String index, long remainingPages, AtomicBoolean isActive) - throws HyracksDataException { - ILSMIOOperation mergeOp = Mockito.mock(ILSMIOOperation.class); - Mockito.when(mergeOp.getIndexIdentifier()).thenReturn(index); - Mockito.when(mergeOp.getIOOpertionType()).thenReturn(LSMIOOperationType.MERGE); - Mockito.when(mergeOp.getRemainingPages()).thenReturn(remainingPages); - - Mockito.doAnswer(new Answer<Boolean>() { - @Override - public Boolean answer(InvocationOnMock invocation) throws Throwable { - return isActive.get(); - } - }).when(mergeOp).isActive(); - Mockito.doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - isActive.set(true); - return null; - } - }).when(mergeOp).resume(); - - Mockito.doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - isActive.set(false); - return null; - } - }).when(mergeOp).pause(); - - Mockito.doAnswer(new Answer<LSMIOOperationStatus>() { - @Override - public LSMIOOperationStatus answer(InvocationOnMock invocation) throws Throwable { - synchronized (lock) { - lock.wait(); - } - return LSMIOOperationStatus.SUCCESS; - } - }).when(mergeOp).call(); - return mergeOp; - - } - -} diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/IoSchedulerTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/IoSchedulerTest.java new file mode 100644 index 0000000..15f65a4 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/IoSchedulerTest.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.test; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; +import org.apache.hyracks.storage.am.lsm.common.impls.AsynchronousScheduler; +import org.apache.hyracks.storage.am.lsm.common.impls.GreedyScheduler; +import org.apache.hyracks.storage.am.lsm.common.impls.NoOpIoOperationFailedCallback; +import org.apache.hyracks.storage.am.lsm.common.test.IoSchedulerTest.MockedOperation; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class IoSchedulerTest { + + protected static final String INDEX_1 = "index1"; + protected static final String INDEX_2 = "index2"; + protected static final String INDEX_3 = "index3"; + protected static final String INDEX_4 = "index4"; + + protected static class MockedOperation { + public final ILSMIOOperation operation; + public final AtomicBoolean scheduled = new AtomicBoolean(); + public final AtomicBoolean running = new AtomicBoolean(); + + public final Semaphore completedSemaphore = new Semaphore(0); + + public MockedOperation(ILSMIOOperation mergeOp) { + this.operation = mergeOp; + } + + public void waitForScheduled() throws InterruptedException { + synchronized (scheduled) { + while (!scheduled.get()) { + scheduled.wait(); + } + } + } + + public void waitForRunning() throws InterruptedException { + synchronized (running) { + while (!running.get()) { + running.wait(); + } + } + } + + } + + @Test + public void testFlush() throws Exception { + int maxRunningFlushes = 2; + + AsynchronousScheduler scheduler = (AsynchronousScheduler) AsynchronousScheduler.FACTORY + .createIoScheduler(r -> new Thread(r), NoOpIoOperationFailedCallback.INSTANCE, maxRunningFlushes, 0, 0); + + MockedOperation op1_1 = mockFlushOperation(INDEX_1); + scheduler.scheduleOperation(op1_1.operation); + op1_1.waitForScheduled(); + + MockedOperation op1_2 = mockFlushOperation(INDEX_1); + scheduler.scheduleOperation(op1_2.operation); + Assert.assertFalse(op1_2.scheduled.get()); + + MockedOperation op2_1 = mockFlushOperation(INDEX_2); + scheduler.scheduleOperation(op2_1.operation); + op2_1.waitForScheduled(); + + MockedOperation op2_2 = mockFlushOperation(INDEX_2); + scheduler.scheduleOperation(op2_2.operation); + Assert.assertFalse(op2_2.scheduled.get()); + + // complete op1_1 + op1_1.completedSemaphore.release(); + op1_2.waitForScheduled(); + + // complete op1_2 + op1_2.completedSemaphore.release(); + Assert.assertFalse(op2_2.scheduled.get()); + + // complete op2_1 + op2_1.completedSemaphore.release(); + op2_2.waitForScheduled(); + + scheduler.close(); + } + + @Test + public void testAsynchronousMerge() throws Exception { + int maxRunningMerges = 2; + + AsynchronousScheduler scheduler = + (AsynchronousScheduler) AsynchronousScheduler.FACTORY.createIoScheduler(r -> new Thread(r), + NoOpIoOperationFailedCallback.INSTANCE, 0, maxRunningMerges, maxRunningMerges); + + MockedOperation op1 = mockMergeOperation(INDEX_1, 10); + scheduler.scheduleOperation(op1.operation); + // op1 is scheduled + op1.waitForScheduled(); + + MockedOperation op2 = mockMergeOperation(INDEX_2, 10); + scheduler.scheduleOperation(op2.operation); + // op2 is scheduled + op2.waitForScheduled(); + + MockedOperation op3 = mockMergeOperation(INDEX_3, 10); + scheduler.scheduleOperation(op3.operation); + // op3 is waiting + Assert.assertFalse(op3.scheduled.get()); + Assert.assertFalse(op3.running.get()); + + MockedOperation op4 = mockMergeOperation(INDEX_4, 10); + scheduler.scheduleOperation(op4.operation); + // op4 is waiting + Assert.assertFalse(op4.scheduled.get()); + Assert.assertFalse(op4.running.get()); + + // complete op2 and wait for op3 + op2.completedSemaphore.release(); + op3.waitForScheduled(); + + // complete op3 and wait for op4 + op3.completedSemaphore.release(); + op4.waitForScheduled(); + + scheduler.close(); + } + + @Test + public void testGreedyMerge() throws Exception { + int maxScheduledMerges = 5; + int maxRunningMerges = 2; + + GreedyScheduler scheduler = (GreedyScheduler) GreedyScheduler.FACTORY.createIoScheduler(r -> new Thread(r), + NoOpIoOperationFailedCallback.INSTANCE, 0, maxScheduledMerges, maxRunningMerges); + + MockedOperation op1_1 = mockMergeOperation(INDEX_1, 10); + scheduler.scheduleOperation(op1_1.operation); + // op1_1 is running + op1_1.waitForScheduled(); + op1_1.waitForRunning(); + + MockedOperation op2 = mockMergeOperation(INDEX_2, 10); + scheduler.scheduleOperation(op2.operation); + // op2 is running + op2.waitForScheduled(); + op2.waitForRunning(); + + MockedOperation op3_1 = mockMergeOperation(INDEX_3, 10); + scheduler.scheduleOperation(op3_1.operation); + // op3_1 is scheduled, but not running + op3_1.waitForScheduled(); + Assert.assertFalse(op3_1.running.get()); + + MockedOperation op3_2 = mockMergeOperation(INDEX_3, 5); + scheduler.scheduleOperation(op3_2.operation); + // op3_2 is scheduled, but not running + op3_2.waitForScheduled(); + Assert.assertFalse(op3_2.running.get()); + + MockedOperation op4 = mockMergeOperation(INDEX_4, 10); + scheduler.scheduleOperation(op4.operation); + // op4 is scheduled, but not running + op4.waitForScheduled(); + Assert.assertFalse(op4.running.get()); + + MockedOperation op1_2 = mockMergeOperation(INDEX_1, 5); + scheduler.scheduleOperation(op1_2.operation); + // op1_2 is waiting, not scheduled + Assert.assertFalse(op1_2.scheduled.get()); + Assert.assertFalse(op1_2.running.get()); + + // complete op2 + op2.completedSemaphore.release(); + + // op1_2 preempts op1_1 because op1_2 is smaller + op1_2.waitForRunning(); + op1_2.waitForScheduled(); + + // op3_2 is running because index3 has more merges than index4 + op3_2.waitForRunning(); + Assert.assertFalse(op3_1.running.get()); + + scheduler.close(); + } + + protected MockedOperation mockMergeOperation(String index, long remainingPages) throws HyracksDataException { + return mockOperation(index, LSMIOOperationType.MERGE, remainingPages); + } + + protected MockedOperation mockFlushOperation(String index) throws HyracksDataException { + return mockOperation(index, LSMIOOperationType.FLUSH, 0); + } + + protected MockedOperation mockOperation(String index, LSMIOOperationType type, long remainingPages) + throws HyracksDataException { + ILSMIOOperation op = Mockito.mock(ILSMIOOperation.class); + MockedOperation mockedOp = new MockedOperation(op); + Mockito.when(op.getIndexIdentifier()).thenReturn(index); + Mockito.when(op.getIOOpertionType()).thenReturn(type); + Mockito.when(op.getRemainingPages()).thenReturn(remainingPages); + + Mockito.doAnswer(new Answer<Boolean>() { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + return mockedOp.running.get(); + } + }).when(op).isActive(); + Mockito.doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + mockedOp.running.set(true); + synchronized (mockedOp.running) { + mockedOp.running.notifyAll(); + } + return null; + } + }).when(op).resume(); + + Mockito.doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + mockedOp.running.set(false); + return null; + } + }).when(op).pause(); + + Mockito.doAnswer(new Answer<LSMIOOperationStatus>() { + @Override + public LSMIOOperationStatus answer(InvocationOnMock invocation) throws Throwable { + mockedOp.scheduled.set(true); + synchronized (mockedOp.scheduled) { + mockedOp.scheduled.notifyAll(); + } + mockedOp.completedSemaphore.acquire(); + return LSMIOOperationStatus.SUCCESS; + } + }).when(op).call(); + return mockedOp; + + } + +}
